-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathanalyzer.py
More file actions
1301 lines (1163 loc) · 50.2 KB
/
analyzer.py
File metadata and controls
1301 lines (1163 loc) · 50.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from typing import List, Dict, Optional
from datetime import datetime, timedelta, timezone as dt_timezone
from collections import Counter, defaultdict
import asyncio
try:
from zoneinfo import ZoneInfo
except ImportError:
from backports.zoneinfo import ZoneInfo # Python < 3.9 fallback
from clients import TautulliClient, ImageGenerationClient
from clients.llm_client import LLMClient
from models import (
WrapData,
Insight,
GenreStat,
ActorStat,
DeviceStat,
BingeSession,
User,
)
from config import Settings
class WrapAnalyzer:
def __init__(self, settings: Settings):
self.settings = settings
self.tautulli = TautulliClient(settings.tautulli_url, settings.tautulli_api_key)
self.llm = LLMClient(
api_key=settings.openai_api_key,
enabled=settings.use_llm,
name_mappings=settings.name_mappings,
custom_prompt_context=settings.custom_prompt_context,
base_url=settings.openai_base_url,
model=settings.openai_model,
)
self.image_gen = ImageGenerationClient(
api_key=settings.google_image_api_key,
enabled=settings.use_image_generation,
name_mappings=settings.name_mappings,
)
# Timezone configuration for accurate time-of-day analysis
self.tz = None
if settings.timezone:
try:
self.tz = ZoneInfo(settings.timezone)
except Exception as e:
print(
f"Warning: Invalid timezone '{settings.timezone}', using server local time: {e}"
)
# Hemisphere setting for correct season mapping
self.southern_hemisphere = settings.southern_hemisphere
def _convert_timestamp(self, timestamp: int | float) -> datetime:
"""Convert Unix timestamp to datetime in the configured timezone"""
if self.tz:
# Convert to UTC first, then to target timezone
utc_dt = datetime.fromtimestamp(timestamp, tz=dt_timezone.utc)
return utc_dt.astimezone(self.tz)
else:
# Fall back to server local time
return datetime.fromtimestamp(timestamp)
def format_duration(self, minutes: int) -> str:
"""Format minutes into human-readable duration"""
if minutes < 60:
return f"{minutes} minutes"
hours = minutes // 60
mins = minutes % 60
if hours < 24:
return f"{hours}h {mins}m" if mins > 0 else f"{hours}h"
days = hours // 24
hours = hours % 24
return f"{days}d {hours}h" if hours > 0 else f"{days} days"
def _extract_genres(self, item: Dict) -> List[str]:
"""Extract genres from an item (handles multiple Tautulli formats)"""
item_genres = []
# Check direct field first
if "genres" in item:
item_genres = item.get("genres", []) or []
# Check nested media_info structure if direct field is empty
if (
not item_genres
and "media_info" in item
and isinstance(item.get("media_info"), dict)
):
media_info = item.get("media_info", {})
item_genres = media_info.get("genres", []) or []
# Handle list of dictionaries (Tautulli format: [{"tag": "Action"}, ...])
if item_genres and isinstance(item_genres, list) and len(item_genres) > 0:
if isinstance(item_genres[0], dict):
item_genres = [g.get("tag", "") for g in item_genres if g.get("tag")]
# Check if it's a string
elif isinstance(item_genres, str):
item_genres = [g.strip() for g in item_genres.split(",") if g.strip()]
# If still empty, try to fetch from Tautulli using rating_key
if not item_genres and item.get("rating_key"):
try:
metadata = self.tautulli.get_metadata(str(item.get("rating_key")))
if metadata and metadata.get("genres"):
item_genres = metadata.get("genres", [])
except Exception:
pass # Silently fail if metadata fetch fails
return item_genres
def calculate_days_between(self, start: str, end: str) -> int:
"""Calculate days between two dates"""
start_date = datetime.strptime(start, "%Y-%m-%d")
end_date = datetime.strptime(end, "%Y-%m-%d")
return (end_date - start_date).days
def analyze_history(
self, history: List[Dict], start_date: str, end_date: str
) -> Dict:
"""Analyze watch history and extract insights"""
if not history:
return {}
total_watch_time = 0
items_watched = set()
episodes_watched = 0
movies_watched = 0
tracks_listened = 0
music_listen_time = 0
genres = Counter()
actors = Counter()
directors = Counter()
content_ratings = Counter()
devices = Counter()
platforms = Counter()
# Music-specific counters
artists = Counter()
albums = Counter()
tracks = Counter()
music_genres = Counter()
# Group by date for binge detection
watches_by_date = defaultdict(list)
# Track repeat watches (content watched multiple times)
content_watch_count = Counter() # title -> count
content_watch_times = defaultdict(list) # title -> list of timestamps
# Track seasonal patterns (by month)
watches_by_month = defaultdict(
lambda: {"time": 0, "count": 0, "genres": Counter()}
)
# Track time-of-day patterns (hour of day: 0-23)
watches_by_hour = defaultdict(lambda: {"time": 0, "count": 0})
# Track day-of-week patterns (Monday=0, Sunday=6)
watches_by_weekday = defaultdict(lambda: {"time": 0, "count": 0})
# Track daily watch times for consistency analysis
daily_watch_times = [] # List of minutes watched per day
# Track continuous sessions (for longest stretch detection)
watch_sessions = [] # List of (start_time, end_time, duration_min)
for item in history:
# Tautulli returns duration in SECONDS (not milliseconds!)
# Try multiple field names for duration
duration = item.get("duration", 0) or item.get("media_duration", 0) or 0
# For watched duration, use play_duration (in seconds) or calculate from stopped - started
watched_duration = (
item.get("play_duration", 0)
or (item.get("stopped", 0) - item.get("started", 0))
if (item.get("stopped", 0) and item.get("started", 0))
else 0 or item.get("watched_duration", 0) or 0
)
# Convert seconds to minutes
duration_min = duration / 60 if duration > 0 else 0
watched_min = watched_duration / 60 if watched_duration > 0 else 0
# Count if watched - be more lenient with criteria
# Tautulli marks items as watched if they've been watched at least once
watched_status = item.get("watched_status", 0)
# Also check if there's a stopped time (means it was played)
has_stopped = item.get("stopped", 0) > 0
# Count if: marked as watched, or has significant watch time (>50% or >5 minutes)
is_watched = (
watched_status == 1
or has_stopped
or (
duration_min > 0
and watched_min > 0
and (watched_min / duration_min) >= 0.5
)
or (watched_min >= 5) # At least 5 minutes watched
)
if is_watched and watched_min > 0:
total_watch_time += watched_min
media_type = item.get("media_type", "") or item.get("type", "")
title = item.get("title", "") or item.get("full_title", "")
grandparent_title = item.get("grandparent_title", "") or item.get(
"parent_title", ""
)
show_title = grandparent_title or title
# Handle music tracks separately
if media_type == "track":
tracks_listened += 1
music_listen_time += watched_min
# Track artist (grandparent_title for music)
artist_name = grandparent_title or item.get("artist", "")
if artist_name:
artists[artist_name] += watched_min
# Track album (parent_title for music)
album_name = item.get("parent_title", "") or item.get("album", "")
if album_name:
albums[album_name] += watched_min
# Track song
if title:
tracks[title] += watched_min
# Track music genres
item_genres = self._extract_genres(item)
for genre in item_genres:
if genre:
music_genres[genre] += watched_min
continue # Skip to next item after processing music
if media_type == "episode" or "episode" in str(media_type).lower():
episodes_watched += 1
items_watched.add(show_title)
elif media_type == "movie" or "movie" in str(media_type).lower():
movies_watched += 1
items_watched.add(title)
else:
# Fallback: treat as show if has grandparent_title
if grandparent_title:
episodes_watched += 1
items_watched.add(show_title)
else:
movies_watched += 1
items_watched.add(title)
# Extract genres
item_genres = self._extract_genres(item)
for genre in item_genres:
if genre:
genres[genre] += watched_min
# Extract actors (Tautulli format)
item_actors = []
# Check direct field first
if "actors" in item:
item_actors = item.get("actors", []) or []
# Check nested media_info structure if direct field is empty
if (
not item_actors
and "media_info" in item
and isinstance(item.get("media_info"), dict)
):
media_info = item.get("media_info", {})
item_actors = media_info.get("actors", []) or []
# Handle list of dictionaries (Tautulli format: [{"tag": "Actor Name"}, ...])
if (
item_actors
and isinstance(item_actors, list)
and len(item_actors) > 0
):
if isinstance(item_actors[0], dict):
item_actors = [
a.get("tag", "") for a in item_actors if a.get("tag")
]
# Check if it's a string
elif isinstance(item_actors, str):
item_actors = [
a.strip() for a in item_actors.split(",") if a.strip()
]
# If still empty, try to fetch from Tautulli using rating_key
if not item_actors and item.get("rating_key"):
try:
metadata = self.tautulli.get_metadata(
str(item.get("rating_key"))
)
if metadata and metadata.get("actors"):
item_actors = metadata.get("actors", [])
except Exception:
pass # Silently fail if metadata fetch fails
for actor in item_actors[:5]: # Top 5 actors
if actor:
actors[actor] += watched_min
# Extract directors
item_directors = []
# Check direct field first
if "directors" in item:
item_directors = item.get("directors", []) or []
# Check nested media_info structure if direct field is empty
if (
not item_directors
and "media_info" in item
and isinstance(item.get("media_info"), dict)
):
media_info = item.get("media_info", {})
item_directors = media_info.get("directors", []) or []
# Handle list of dictionaries (Tautulli format: [{"tag": "Director Name"}, ...])
if (
item_directors
and isinstance(item_directors, list)
and len(item_directors) > 0
):
if isinstance(item_directors[0], dict):
item_directors = [
d.get("tag", "") for d in item_directors if d.get("tag")
]
# Check if it's a string
elif isinstance(item_directors, str):
item_directors = [
d.strip() for d in item_directors.split(",") if d.strip()
]
# If still empty, try to fetch from Tautulli using rating_key
if not item_directors and item.get("rating_key"):
try:
metadata = self.tautulli.get_metadata(
str(item.get("rating_key"))
)
if metadata and metadata.get("directors"):
item_directors = metadata.get("directors", [])
except Exception:
pass # Silently fail if metadata fetch fails
for director in item_directors:
if director:
directors[director] += watched_min
# Devices and platforms
player = item.get("player", "") or item.get("platform", "")
platform = item.get("platform", "") or item.get("platform_name", "")
if player:
devices[player] += watched_min
if platform and platform != player:
platforms[platform] += watched_min
# Track content for repeat detection
# Only track movies for repeat watches - TV episodes are not re-watches
# (watching different episodes of a show is normal viewing, not re-watching)
# Check multiple indicators to detect episodes more reliably
has_season_number = (
item.get("season_number") is not None
or item.get("parent_index") is not None
)
has_episode_number = (
item.get("episode_number") is not None
or item.get("index") is not None
)
is_episode = (
media_type == "episode"
or "episode" in str(media_type).lower()
or grandparent_title
or has_season_number
or has_episode_number
)
# Initialize content_key for watch sessions (use show_title for episodes, title for movies)
content_key = show_title if is_episode else (title or "Unknown")
# Only track movies (non-episodes) for repeat detection
# Use title (not content_key) to ensure we're tracking the actual movie title
if not is_episode and title:
# Only track movies for repeat detection
content_watch_count[title] += 1
# Track when it was watched
date_timestamp = item.get("date", 0) or item.get("started", 0)
if date_timestamp:
content_watch_times[title].append(date_timestamp)
# Group by date (Tautulli uses Unix timestamps)
date_timestamp = item.get("date", 0) or item.get("started", 0)
started_ts = item.get("started", 0)
if date_timestamp:
# Convert Unix timestamp to date string (using configured timezone)
try:
if isinstance(date_timestamp, (int, float)):
date_obj = self._convert_timestamp(date_timestamp)
date_str = date_obj.strftime("%Y-%m-%d")
month_key = date_obj.strftime("%Y-%m") # YYYY-MM format
# Track day of week (Monday=0, Sunday=6)
weekday = date_obj.weekday()
watches_by_weekday[weekday]["time"] += watched_min
watches_by_weekday[weekday]["count"] += 1
# Track seasonal patterns
watches_by_month[month_key]["time"] += watched_min
watches_by_month[month_key]["count"] += 1
for genre in item_genres:
if genre:
watches_by_month[month_key]["genres"][
genre
] += watched_min
else:
# If it's already a string, try to parse it
date_str = (
str(date_timestamp).split(" ")[0]
if " " in str(date_timestamp)
else str(date_timestamp)
)
watches_by_date[date_str].append(item)
# Track time-of-day patterns (using started timestamp in configured timezone)
if started_ts and isinstance(started_ts, (int, float)):
try:
start_dt = self._convert_timestamp(started_ts)
hour = start_dt.hour
watches_by_hour[hour]["time"] += watched_min
watches_by_hour[hour]["count"] += 1
except (ValueError, OSError, TypeError):
pass
# Track watch sessions for continuous stretch detection
stopped_ts = item.get("stopped", 0) or started_ts
if started_ts and stopped_ts:
watch_sessions.append(
{
"start": started_ts,
"end": stopped_ts,
"duration": watched_min,
"title": content_key,
}
)
except (ValueError, OSError, TypeError):
# If conversion fails, skip date grouping for this item
pass
# Calculate genre percentages
total_genre_time = sum(genres.values())
genre_stats = []
for genre, time in genres.most_common(10):
genre_stats.append(
GenreStat(
genre=genre,
watch_time=int(round(time)),
count=sum(1 for item in history if genre in item.get("genres", [])),
percentage=(
(time / total_genre_time * 100) if total_genre_time > 0 else 0
),
)
)
# Top actors
actor_stats = [
ActorStat(
name=name,
watch_time=int(round(time)),
count=sum(1 for item in history if name in item.get("actors", [])),
)
for name, time in actors.most_common(10)
]
# Top directors
director_stats = [
ActorStat(
name=name,
watch_time=int(round(time)),
count=sum(1 for item in history if name in item.get("directors", [])),
)
for name, time in directors.most_common(10)
]
# Device stats
total_device_time = sum(devices.values())
device_stats = [
DeviceStat(
device=device,
watch_time=int(round(time)),
percentage=(
(time / total_device_time * 100) if total_device_time > 0 else 0
),
)
for device, time in devices.most_common(10)
]
# Platform stats
total_platform_time = sum(platforms.values())
platform_stats = [
DeviceStat(
device=platform,
watch_time=int(round(time)),
percentage=(
(time / total_platform_time * 100) if total_platform_time > 0 else 0
),
)
for platform, time in platforms.most_common(10)
]
# Detect binge sessions
binge_sessions = self._detect_binge_sessions(watches_by_date)
# Find repeat watchers
repeat_watches = {
title: count for title, count in content_watch_count.items() if count > 1
}
# Find longest continuous session
longest_session = self._find_longest_continuous_session(watch_sessions)
# Find day with most watching
day_with_most = self._find_day_with_most_watching(watches_by_date)
# Calculate daily watch times for consistency analysis
for date, items in watches_by_date.items():
daily_total = 0
for item in items:
watched_duration = (
item.get("play_duration", 0)
or (item.get("stopped", 0) - item.get("started", 0))
if (item.get("stopped", 0) and item.get("started", 0))
else 0 or item.get("watched_duration", 0) or 0
)
daily_total += watched_duration / 60 # Convert to minutes
daily_watch_times.append(daily_total)
# Analyze time-of-day patterns
time_of_day_analysis = self._analyze_time_of_day(watches_by_hour)
# Analyze day-of-week patterns
day_of_week_analysis = self._analyze_day_of_week(watches_by_weekday)
# Analyze consistency
consistency_analysis = self._analyze_consistency(
daily_watch_times, start_date, end_date
)
# Calculate music statistics
top_artists = [
{"name": name, "listen_time": int(round(time))}
for name, time in artists.most_common(10)
]
top_albums = [
{"name": name, "listen_time": int(round(time))}
for name, time in albums.most_common(10)
]
top_tracks = [
{
"name": name,
"play_count": int(round(time / 3)) if time > 0 else 1,
} # Rough estimate: avg song ~3 min
for name, time in tracks.most_common(10)
]
# Music genre percentages
total_music_genre_time = sum(music_genres.values())
music_genre_stats = [
{
"genre": genre,
"listen_time": int(round(time)),
"percentage": (
(time / total_music_genre_time * 100)
if total_music_genre_time > 0
else 0
),
}
for genre, time in music_genres.most_common(10)
]
return {
"total_watch_time": int(round(total_watch_time)),
"total_items_watched": len(items_watched),
"total_episodes_watched": episodes_watched,
"total_movies_watched": movies_watched,
"genres": genre_stats,
"actors": actor_stats,
"directors": director_stats,
"devices": device_stats,
"platforms": platform_stats,
"binge_sessions": binge_sessions,
"history": history,
"repeat_watches": repeat_watches,
"watches_by_month": dict(watches_by_month),
"longest_session": longest_session,
"day_with_most": day_with_most,
"time_of_day": time_of_day_analysis,
"day_of_week": day_of_week_analysis,
"consistency": consistency_analysis,
# Music statistics
"tracks_listened": tracks_listened,
"music_listen_time": int(round(music_listen_time)),
"top_artists": top_artists,
"top_albums": top_albums,
"top_tracks": top_tracks,
"music_genres": music_genre_stats,
}
def _detect_binge_sessions(
self, watches_by_date: Dict[str, List[Dict]]
) -> List[BingeSession]:
"""Detect binge watching sessions (multiple episodes/movies in one day)"""
binge_sessions = []
for date, items in watches_by_date.items():
# Group by show/movie
by_content = defaultdict(list)
for item in items:
title = item.get("grandparent_title") or item.get("title", "")
by_content[title].append(item)
# Find days with significant watching
# Duration is in seconds, convert to minutes
total_duration = 0
for item in items:
watched_duration = (
item.get("play_duration", 0)
or (item.get("stopped", 0) - item.get("started", 0))
if (item.get("stopped", 0) and item.get("started", 0))
else 0 or item.get("watched_duration", 0) or 0
)
total_duration += watched_duration / 60 # Convert seconds to minutes
# Consider it a binge if more than 2 hours in a day
if total_duration > 120:
episodes = sum(
1 for item in items if item.get("media_type") == "episode"
)
content_list = list(by_content.keys())
binge_sessions.append(
BingeSession(
date=date,
duration=int(round(total_duration)),
content=content_list,
episodes=episodes,
)
)
# Sort by duration
binge_sessions.sort(key=lambda x: x.duration, reverse=True)
return binge_sessions
def _find_longest_continuous_session(
self, watch_sessions: List[Dict]
) -> Optional[Dict]:
"""Find the longest continuous watching session"""
if not watch_sessions:
return None
# Sort sessions by start time
sorted_sessions = sorted(watch_sessions, key=lambda x: x.get("start", 0))
longest = None
max_duration = 0
# Group sessions that are close together (within 30 minutes)
current_group = []
for session in sorted_sessions:
if not current_group:
current_group = [session]
else:
last_end = current_group[-1].get("end", 0)
current_start = session.get("start", 0)
# If sessions are within 30 minutes, consider them continuous
if current_start - last_end <= 1800: # 30 minutes in seconds
current_group.append(session)
else:
# Calculate total duration of this group
group_duration = sum(s.get("duration", 0) for s in current_group)
if group_duration > max_duration:
max_duration = group_duration
longest = {
"duration": int(round(group_duration)),
"start": current_group[0].get("start", 0),
"end": current_group[-1].get("end", 0),
"items": len(current_group),
}
current_group = [session]
# Check last group
if current_group:
group_duration = sum(s.get("duration", 0) for s in current_group)
if group_duration > max_duration:
max_duration = group_duration
longest = {
"duration": int(round(group_duration)),
"start": current_group[0].get("start", 0),
"end": current_group[-1].get("end", 0),
"items": len(current_group),
}
return longest
def _find_day_with_most_watching(
self, watches_by_date: Dict[str, List[Dict]]
) -> Optional[Dict]:
"""Find the day with the most watching time"""
if not watches_by_date:
return None
max_time = 0
best_day = None
for date, items in watches_by_date.items():
total_duration = 0
for item in items:
watched_duration = (
item.get("play_duration", 0)
or (item.get("stopped", 0) - item.get("started", 0))
if (item.get("stopped", 0) and item.get("started", 0))
else 0 or item.get("watched_duration", 0) or 0
)
total_duration += watched_duration / 60 # Convert to minutes
if total_duration > max_time:
max_time = total_duration
best_day = {
"date": date,
"duration": int(round(total_duration)),
"items": len(items),
}
return best_day
def _get_season(self, month: int) -> str:
"""Get season name from month (1-12), respecting hemisphere setting"""
# Northern Hemisphere seasons
if month in [12, 1, 2]:
season = "Winter"
elif month in [3, 4, 5]:
season = "Spring"
elif month in [6, 7, 8]:
season = "Summer"
else:
season = "Fall"
# Flip seasons for Southern Hemisphere
if self.southern_hemisphere:
season_flip = {
"Winter": "Summer",
"Summer": "Winter",
"Spring": "Fall",
"Fall": "Spring",
}
season = season_flip[season]
return season
def _analyze_seasonal_patterns(self, watches_by_month: Dict) -> Dict:
"""Analyze viewing patterns by season"""
if not watches_by_month:
return {}
seasonal_data = defaultdict(
lambda: {"time": 0, "count": 0, "genres": Counter()}
)
for month_key, data in watches_by_month.items():
try:
year, month = map(int, month_key.split("-"))
season = self._get_season(month)
seasonal_data[season]["time"] += data["time"]
seasonal_data[season]["count"] += data["count"]
for genre, time in data["genres"].items():
seasonal_data[season]["genres"][genre] += time
except (ValueError, KeyError):
continue
# Find most active season
most_active = (
max(seasonal_data.items(), key=lambda x: x[1]["time"])
if seasonal_data
else None
)
# Round numbers and convert Counter to dict with rounded values
rounded_seasonal_data = {}
for season, data in seasonal_data.items():
rounded_genres = {
genre: round(time, 2) for genre, time in data["genres"].items()
}
rounded_seasonal_data[season] = {
"time": round(data["time"], 2),
"count": data["count"],
"genres": rounded_genres,
}
return {
"by_season": rounded_seasonal_data,
"most_active": most_active[0] if most_active else None,
"most_active_time": (
int(round(most_active[1]["time"])) if most_active else 0
),
}
def _analyze_time_of_day(self, watches_by_hour: Dict) -> Dict:
"""Analyze viewing patterns by time of day"""
if not watches_by_hour:
return {}
# Group hours into time periods
time_periods = {
"morning": (6, 12), # 6 AM - 12 PM
"afternoon": (12, 18), # 12 PM - 6 PM
"evening": (18, 22), # 6 PM - 10 PM
"night": (22, 6), # 10 PM - 6 AM (wraps around)
}
period_totals = {
period: {"time": 0, "count": 0} for period in time_periods.keys()
}
# Calculate totals by period
for hour, data in watches_by_hour.items():
hour_int = int(hour)
for period, (start, end) in time_periods.items():
if period == "night":
# Handle wrap-around for night (22-6)
if hour_int >= start or hour_int < end:
period_totals[period]["time"] += data["time"]
period_totals[period]["count"] += data["count"]
else:
if start <= hour_int < end:
period_totals[period]["time"] += data["time"]
period_totals[period]["count"] += data["count"]
# Find most active time period
most_active_period = (
max(period_totals.items(), key=lambda x: x[1]["time"])
if period_totals
else None
)
# Find peak hour
peak_hour = (
max(watches_by_hour.items(), key=lambda x: x[1]["time"])
if watches_by_hour
else None
)
return {
"by_period": {
period: {
"time": round(data["time"], 2),
"count": data["count"],
"percentage": (
round(
data["time"]
/ sum(p["time"] for p in period_totals.values())
* 100,
2,
)
if sum(p["time"] for p in period_totals.values()) > 0
else 0
),
}
for period, data in period_totals.items()
},
"most_active_period": most_active_period[0] if most_active_period else None,
"most_active_period_time": (
int(round(most_active_period[1]["time"])) if most_active_period else 0
),
"peak_hour": peak_hour[0] if peak_hour else None,
"peak_hour_time": (int(round(peak_hour[1]["time"])) if peak_hour else 0),
"by_hour": {
str(hour): {"time": round(data["time"], 2), "count": data["count"]}
for hour, data in sorted(watches_by_hour.items())
},
}
def _analyze_day_of_week(self, watches_by_weekday: Dict) -> Dict:
"""Analyze viewing patterns by day of week"""
if not watches_by_weekday:
return {}
weekday_names = {
0: "Monday",
1: "Tuesday",
2: "Wednesday",
3: "Thursday",
4: "Friday",
5: "Saturday",
6: "Sunday",
}
# Calculate totals
weekday_totals = {}
for weekday, data in watches_by_weekday.items():
weekday_totals[weekday] = {
"time": round(data["time"], 2),
"count": data["count"],
"name": weekday_names.get(int(weekday), "Unknown"),
}
# Find most active day
most_active_day = (
max(watches_by_weekday.items(), key=lambda x: x[1]["time"])
if watches_by_weekday
else None
)
# Calculate percentages
total_time = sum(data["time"] for data in weekday_totals.values())
for weekday in weekday_totals:
weekday_totals[weekday]["percentage"] = (
round(weekday_totals[weekday]["time"] / total_time * 100, 2)
if total_time > 0
else 0
)
return {
"by_weekday": {
weekday_names.get(int(day), f"Day_{day}"): data
for day, data in sorted(weekday_totals.items())
},
"most_active_day": (
weekday_names.get(int(most_active_day[0]), "Unknown")
if most_active_day
else None
),
"most_active_day_time": (
int(round(most_active_day[1]["time"])) if most_active_day else 0
),
}
def _analyze_consistency(
self, daily_watch_times: List[float], start_date: str, end_date: str
) -> Dict:
"""Analyze viewing consistency and regularity"""
if not daily_watch_times:
return {}
import statistics
# Calculate basic stats
total_days = self.calculate_days_between(start_date, end_date) + 1
days_with_watching = len([t for t in daily_watch_times if t > 0])
days_without_watching = total_days - days_with_watching
if not daily_watch_times:
return {
"consistency_score": 0,
"regularity": "no_data",
"average_daily_minutes": 0,
"days_with_watching": 0,
"days_without_watching": total_days,
"watch_frequency_percentage": 0,
}
avg_daily = statistics.mean(daily_watch_times)
# Calculate consistency score (0-100)
# Based on: frequency of watching + regularity of watch time
watch_frequency = (
(days_with_watching / total_days) * 100 if total_days > 0 else 0
)
# Regularity: lower standard deviation = more regular
if len(daily_watch_times) > 1:
std_dev = statistics.stdev(daily_watch_times)
# Normalize: lower std dev relative to mean = more consistent
# If mean is 0, consistency is 0
if avg_daily > 0:
coefficient_of_variation = std_dev / avg_daily
# Lower CV = more consistent (invert and scale to 0-100)
regularity_score = max(
0, min(100, 100 - (coefficient_of_variation * 50))
)
else:
regularity_score = 0
else:
regularity_score = 50 # Neutral if only one data point
# Combined consistency score (weighted average)
consistency_score = watch_frequency * 0.6 + regularity_score * 0.4
# Determine regularity label
if consistency_score >= 80:
regularity = "very_consistent"
elif consistency_score >= 60:
regularity = "consistent"
elif consistency_score >= 40:
regularity = "moderate"
elif consistency_score >= 20:
regularity = "sporadic"
else:
regularity = "irregular"
# Find longest streak without watching
longest_gap = 0
current_gap = 0
for time in daily_watch_times:
if time == 0:
current_gap += 1
longest_gap = max(longest_gap, current_gap)
else:
current_gap = 0