-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.py
More file actions
377 lines (306 loc) · 12.5 KB
/
main.py
File metadata and controls
377 lines (306 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response
from typing import List, Optional
import uvicorn
from config import load_config
from analyzer import WrapAnalyzer
from clients import TautulliClient
from models import User, WrapData
from pregenerate import WrapStorage
app = FastAPI(title="Plex Wrapped API", version="1.0.0")
# CORS middleware - allow all origins for LAN access
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins for LAN access
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Load configuration (will be loaded fresh on each request for flexibility)
def get_settings():
return load_config()
def get_analyzer():
return WrapAnalyzer(get_settings())
@app.get("/")
async def root():
return {"message": "Plex Wrapped API", "version": "1.0.0"}
@app.get("/api/health")
async def health():
"""Health check endpoint"""
try:
# Test connections
settings = get_settings()
tautulli = TautulliClient(settings.tautulli_url, settings.tautulli_api_key)
tautulli.get_users()
return {"status": "healthy", "services": {"tautulli": "connected"}}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
@app.get("/api/users", response_model=List[User])
async def get_users():
"""Get all users from Plex/Tautulli"""
try:
analyzer = get_analyzer()
users = analyzer.tautulli.get_users()
return [
User(
id=str(u.get("user_id", "")),
username=u.get("username", "") or u.get("friendly_name", ""),
title=u.get("friendly_name"),
thumb=u.get("thumb"),
)
for u in users
]
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch users: {str(e)}")
def filter_generated_images(wrap_data: dict, image_generation_enabled: bool) -> dict:
"""Remove generated_image fields from cards if image generation is disabled"""
if image_generation_enabled:
return wrap_data
# Create a copy to avoid modifying the original
filtered_data = wrap_data.copy()
# Filter out generated_image from cards if present
if "cards" in filtered_data and isinstance(filtered_data["cards"], list):
filtered_cards = []
for card in filtered_data["cards"]:
if isinstance(card, dict):
filtered_card = {
k: v for k, v in card.items() if k != "generated_image"
}
filtered_cards.append(filtered_card)
else:
filtered_cards.append(card)
filtered_data["cards"] = filtered_cards
return filtered_data
@app.get("/api/wrap/{username}", response_model=WrapData)
async def get_wrap(username: str):
"""Get wrap for a specific user (from pre-generated storage)"""
try:
storage = WrapStorage()
settings = get_settings()
# Try to load from storage first
wrap_data = storage.load_wrap(username)
if wrap_data:
# Filter out generated images if image generation is disabled
wrap_data = filter_generated_images(
wrap_data, settings.use_image_generation
)
# Convert dict back to WrapData model
return WrapData(**wrap_data)
# If not found in storage, return 404
raise HTTPException(
status_code=404,
detail=f"Wrap not found for user '{username}'. Please run pregenerate.py to generate wraps.",
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to load wrap: {str(e)}")
@app.get("/api/wrap-by-token/{token}", response_model=WrapData)
async def get_wrap_by_token(token: str):
"""Get wrap by shareable token"""
try:
storage = WrapStorage()
settings = get_settings()
# Try to load wrap by token
wrap_data = storage.load_wrap_by_token(token)
if wrap_data:
# Filter out generated images if image generation is disabled
wrap_data = filter_generated_images(
wrap_data, settings.use_image_generation
)
# Convert dict back to WrapData model
return WrapData(**wrap_data)
# If not found, return 404
raise HTTPException(
status_code=404,
detail="Wrap not found for this token. The token may be invalid or the wrap may not exist.",
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to load wrap: {str(e)}")
@app.get("/api/token/{username}")
async def get_token_for_user(username: str):
"""Get shareable token for a username"""
try:
storage = WrapStorage()
token = storage.get_token_for_user(username)
if token:
return {"username": username, "token": token}
else:
raise HTTPException(
status_code=404,
detail=f"Could not generate token for user '{username}'. Wrap may not exist.",
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get token: {str(e)}")
@app.get("/api/plex-image/{thumb_path:path}")
async def get_plex_image(thumb_path: str):
"""Proxy Plex images through Tautulli's pms_image_proxy"""
import httpx
from urllib.parse import quote
try:
settings = get_settings()
# Remove leading slash if present
thumb_path = thumb_path.lstrip("/")
# Use Tautulli's pms_image_proxy to fetch images
# URL encode the img path to handle special characters
tautulli_base = settings.tautulli_url.rstrip("/")
img_path = f"/{thumb_path}"
full_url = f"{tautulli_base}/api/v2?apikey={settings.tautulli_api_key}&cmd=pms_image_proxy&img={quote(img_path, safe='')}"
# Fetch the image through Tautulli
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
response = await client.get(full_url)
response.raise_for_status()
# Determine content type from response headers or default to image/jpeg
content_type = response.headers.get("content-type", "image/jpeg")
# Return the image with appropriate headers
return Response(
content=response.content,
media_type=content_type,
headers={
"Cache-Control": "public, max-age=86400", # Cache for 1 day
},
)
except httpx.ConnectError as e:
raise HTTPException(
status_code=502,
detail=f"Cannot connect to Tautulli at {settings.tautulli_url}. If running in Docker, use 'host.docker.internal' instead of 'localhost'. Error: {str(e)}",
)
except httpx.HTTPError as e:
raise HTTPException(
status_code=502, detail=f"Failed to fetch image from Tautulli: {str(e)}"
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to proxy image: {str(e)}")
@app.get("/api/generated-image-info")
async def get_generated_image_info(path: str):
"""Get metadata for generated image (modification time for cache-busting)"""
from pathlib import Path
import os
try:
# Use the path from query parameter
image_path = path.replace("%2F", "/").replace("%5C", "\\")
# Construct full path relative to project root
project_root = Path.cwd()
full_path = (project_root / image_path).resolve()
# Security: Ensure path is within project directory
if not str(full_path).startswith(str(project_root.resolve())):
raise HTTPException(status_code=403, detail="Access denied")
# Check if file exists
if not full_path.exists():
raise HTTPException(status_code=404, detail="Image not found")
# Get file modification time
mtime = os.path.getmtime(full_path)
return {"mtime": int(mtime), "exists": True}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to get image info: {str(e)}"
)
@app.get("/api/generated-image")
async def get_generated_image(path: str, t: Optional[int] = None):
"""Serve generated card images
Args:
path: Path to the image file
t: Optional timestamp query parameter for cache-busting
"""
from fastapi.responses import FileResponse
from pathlib import Path
import os
try:
# Use the path from query parameter
image_path = path.replace("%2F", "/").replace("%5C", "\\")
# Construct full path relative to project root
project_root = Path.cwd()
full_path = (project_root / image_path).resolve()
# Security: Ensure path is within project directory
if not str(full_path).startswith(str(project_root.resolve())):
raise HTTPException(status_code=403, detail="Access denied")
# Check if file exists
if not full_path.exists():
raise HTTPException(status_code=404, detail="Image not found")
# Get file modification time for cache-busting
mtime = os.path.getmtime(full_path)
# Return file with cache headers that allow revalidation
# Use must-revalidate so browser checks if file changed
# Also include Last-Modified header for proper cache validation
from email.utils import formatdate
last_modified = formatdate(mtime, usegmt=True)
return FileResponse(
full_path,
media_type="image/png",
headers={
"Cache-Control": "public, max-age=3600, must-revalidate", # Cache for 1 hour, but revalidate
"Last-Modified": last_modified, # Include modification time for cache validation
},
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to serve image: {str(e)}")
@app.get("/api/debug/history/{username}")
async def debug_history(username: str):
"""Debug endpoint to see raw history data"""
try:
analyzer = get_analyzer()
tautulli_users = analyzer.tautulli.get_users()
user_data = next(
(
u
for u in tautulli_users
if u.get("username") == username or u.get("friendly_name") == username
),
None,
)
if not user_data:
raise HTTPException(status_code=404, detail=f"User {username} not found")
user_id = user_data.get("user_id")
# Try without date filters first
history_no_filter = analyzer.tautulli.get_user_history(
user_id=user_id, start_date=None, end_date=None
)
# Try with date filters
history_with_filter = analyzer.tautulli.get_user_history(
user_id=user_id,
start_date=analyzer.settings.start_date,
end_date=analyzer.settings.end_date,
)
# Try direct API call to see raw response
import requests
url = f"{analyzer.tautulli.base_url}/api/v2"
test_params = {
"apikey": analyzer.tautulli.api_key,
"cmd": "get_history",
"user_id": int(user_id) if user_id else None,
"length": 10,
}
test_response = requests.get(url, params=test_params)
raw_data = test_response.json() if test_response.status_code == 200 else None
return {
"user_id": user_id,
"user_data": user_data,
"username": username,
"date_range": {
"start": analyzer.settings.start_date,
"end": analyzer.settings.end_date,
},
"history_no_filter": {
"count": len(history_no_filter),
"sample": history_no_filter[:3] if history_no_filter else [],
},
"history_with_filter": {
"count": len(history_with_filter),
"sample": history_with_filter[:3] if history_with_filter else [],
},
"raw_api_response": raw_data,
}
except Exception as e:
import traceback
return {"error": str(e), "traceback": traceback.format_exc()}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8766)