-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcli.py
More file actions
288 lines (255 loc) · 13.6 KB
/
cli.py
File metadata and controls
288 lines (255 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import argparse
import sys
import logging
import time
import sqlite3
from plex_client import PlexClient
from overseerr_client import OverseerrClient
from tautulli_client import TautulliClient
from sonarr_client import SonarrClient
def safe_execute(cursor, query, params, retries=5, delay=0.2):
"""Execute a SQLite query with retry logic for locked database errors."""
for attempt in range(retries):
try:
cursor.execute(query, params)
return
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
time.sleep(delay)
else:
raise
raise RuntimeError(
"Failed to write to database after multiple retries due to locking."
)
# Handles command line interface and argument parsing
def main_cli(config, db):
# Helper to get config value with type
def get_config_bool(section, key, default=False):
if section not in config:
return default
if key not in config[section]:
return default
val = config[section][key]
if isinstance(val, bool):
return val
return str(val).lower() in ("1", "true", "yes", "on")
def get_config_str(section, key, default=None):
if section not in config:
return default
if key not in config[section]:
return default
return config[section][key]
parser = argparse.ArgumentParser(description="ShowSweep: Manage unwatched TV shows in Plex.")
parser.add_argument('--skip-confirmation', action='store_true', default=get_config_bool('general', 'skip_confirmation', False), help='Run without interactive prompts')
parser.add_argument('--debug', action='store_true', default=get_config_bool('general', 'debug', False), help='Enable debug level logging')
parser.add_argument('--force-refresh', action='store_true', default=get_config_bool('general', 'force_refresh', False), help='Bypass cache and fetch fresh data')
parser.add_argument('--action', choices=['delete', 'keep_first_season', 'keep_first_episode', 'keep'], default=get_config_str('general', 'action', None), help='Default action for non-interactive mode')
parser.add_argument('--skip-overseerr', action='store_true', default=get_config_bool('general', 'skip_overseerr', False), help='Skip Overseerr checks for recent requests')
parser.add_argument('--skip-tautulli', action='store_true', default=get_config_bool('general', 'skip_tautulli', False), help='Skip Tautulli checks for watch history')
parser.add_argument('--ignore-first-season', action='store_true', default=get_config_bool('general', 'ignore_first_season', False), help='Ignore shows that only have their first season downloaded')
parser.add_argument('--ignore-first-episode', action='store_true', default=get_config_bool('general', 'ignore_first_episode', False), help='Ignore shows that only have their first episode downloaded')
args = parser.parse_args()
# Set logging level to DEBUG if --debug is passed
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
logging.debug("Debug mode enabled via --debug argument.")
logging.debug(f"CLI arguments: {args}")
# Initialize API clients
plex = PlexClient(config, db)
overseerr = OverseerrClient(config, db)
tautulli = TautulliClient(config, db)
sonarr = SonarrClient(config, db)
logging.debug("Initialized Plex, Overseerr, Tautulli, and Sonarr clients.")
# Prefetch all Overseerr requests only if we're not skipping Overseerr checks
if not args.skip_overseerr:
logging.info("Prefetching Overseerr requests...")
overseerr._fetch_all_requests(force_refresh=args.force_refresh)
logging.info("Overseerr requests cached.")
else:
logging.info("Skipping Overseerr prefetch (--skip-overseerr enabled)")
# Keep track of eligible shows for final report
eligible_shows = []
total_shows = 0
skipped_shows = {'overseerr': 0, 'tautulli': 0, 'plex': 0}
# Main logic: discover shows, apply rules, prompt user or apply default action
shows = plex.get_shows()
total_shows = len(shows)
logging.info(f"Discovered {total_shows} shows from Plex.")
for show in shows:
logging.debug(f"Processing show: {show}")
# Check if the show has only first season or first episode
has_only_first_season = show.get('has_only_first_season', False)
has_only_first_episode = show.get('has_only_first_episode', False)
# Skip shows with only first season if ignore-first-season is set
if args.ignore_first_season and has_only_first_season:
logging.info(f"Ignoring show '{show['title']}' (has only first season)")
continue
# Skip shows with only first episode if ignore-first-episode is set
if args.ignore_first_episode and has_only_first_episode:
logging.info(f"Ignoring show '{show['title']}' (has only first episode)")
continue
# Check safe deletion rules (respecting skip arguments)
if not args.skip_overseerr and overseerr.is_recent_request(show['id']):
logging.info(f"Skipping show '{show['title']}' (recent Overseerr request)")
skipped_shows['overseerr'] += 1
continue
# Get watch stats and possibly TVDB ID from Tautulli
if not args.skip_tautulli:
has_watch_history, tvdb_temp = tautulli.get_watch_stats(show['id'])
if has_watch_history:
logging.info(f"Skipping show '{show['title']}' (has watch history in Tautulli)")
skipped_shows['tautulli'] += 1
continue
# We'll save tvdb_temp later if the show is eligible
if plex.has_watch_history(show['id']):
logging.info(f"Skipping show '{show['title']}' (has watch history in Plex)")
skipped_shows['plex'] += 1
continue
# Show is eligible for action
logging.debug(f"Show '{show['title']}' eligible for action.")
# Use TVDB ID from Tautulli if available, otherwise fetch it
tvdb_id = tvdb_temp if 'tvdb_temp' in locals() and tvdb_temp else None
if not tvdb_id and not args.skip_tautulli:
# Only fetch metadata if we didn't already get TVDB ID from get_watch_stats
metadata_json = tautulli._fetch_metadata(show['id'])
if metadata_json:
tvdb_id = tautulli._extract_tvdb_id(metadata_json)
# Calculate disk space used by the show
disk_space_bytes, disk_space_formatted = plex.get_show_disk_space(show['id'])
logging.info(f"Show '{show['title']}' uses {disk_space_formatted} of disk space")
# Save TVDB ID for eligible show
if tvdb_id:
logging.info(f"Found TVDB ID {tvdb_id} for eligible show '{show['title']}'")
db.save_tvdb_id(show['id'], tvdb_id)
# Add disk space and TVDB ID to the show object
show['disk_space_bytes'] = disk_space_bytes
show['disk_space_formatted'] = disk_space_formatted
if tvdb_id:
show['tvdb_id'] = tvdb_id
eligible_shows.append(show)
# Insert or update eligible show in the shows table
c = db.conn.cursor()
now = int(time.time())
if tvdb_id:
safe_execute(
c,
'REPLACE INTO shows (id, title, last_processed, action, tvdb_id, disk_space_bytes, disk_space_formatted) VALUES (?, ?, ?, ?, ?, ?, ?)',
(
show['id'],
show['title'],
now,
'eligible',
tvdb_id,
disk_space_bytes,
disk_space_formatted,
),
)
else:
safe_execute(
c,
'REPLACE INTO shows (id, title, last_processed, action, disk_space_bytes, disk_space_formatted) VALUES (?, ?, ?, ?, ?, ?)',
(
show['id'],
show['title'],
now,
'eligible',
disk_space_bytes,
disk_space_formatted,
),
)
db.conn.commit()
logging.debug(f"Recorded {len(eligible_shows)} eligible shows in the database.")
# Generate summary report
eligible_count = len(eligible_shows)
logging.info("=" * 50)
logging.info("SHOWSWEEP REPORT")
logging.info("=" * 50)
logging.info(f"Total shows scanned: {total_shows}")
logging.info(f"Shows skipped due to Overseerr recent requests: {skipped_shows['overseerr']}")
logging.info(f"Shows skipped due to Tautulli watch history: {skipped_shows['tautulli']}")
logging.info(f"Shows skipped due to Plex watch history: {skipped_shows['plex']}")
logging.info(f"Shows eligible for action: {eligible_count}")
if eligible_count > 0:
logging.info("\nEligible shows:")
logging.info("-" * 70)
logging.info(f"{'#':<3} {'Title':<40} {'Year':<6} {'Size':<10}")
logging.info("-" * 70)
# Calculate total disk space used by eligible shows
total_bytes = sum(show.get('disk_space_bytes', 0) for show in eligible_shows)
formatted_total = plex._format_size(total_bytes)
for i, show in enumerate(eligible_shows, 1):
year = f"{show.get('year', '')}" if show.get('year') else "-"
size = show.get('disk_space_formatted', '-')
logging.info(f"{i:<3} {show['title'][:38]:<40} {year:<6} {size:<10}")
logging.info("-" * 70)
logging.info(f"Total disk space used by eligible shows: {formatted_total}")
if args.skip_confirmation:
# Non-interactive: apply the action flag to all eligible shows
action = args.action or 'keep'
logging.info(f"Applying action '{action}' to all eligible shows (skip-confirmation enabled)")
for show in eligible_shows:
if action == 'delete':
# Delete show in Plex
if plex.delete_show(show['id']):
logging.info(f"Deleted show '{show['title']}' from Plex")
# Unmonitor in Sonarr
if sonarr.unmonitor_series(show['id'], show.get('guid')):
logging.info(f"Unmonitored show '{show['title']}' in Sonarr")
elif action == 'keep_first_season':
# Keep only first season in Plex
if plex.keep_first_season(show['id']):
logging.info(f"Kept only first season of '{show['title']}' in Plex")
# Unmonitor in Sonarr
if sonarr.unmonitor_series(show['id'], show.get('guid')):
logging.info(f"Unmonitored show '{show['title']}' in Sonarr")
elif action == 'keep_first_episode':
# Keep only first episode in Plex
if plex.keep_first_episode(show['id']):
logging.info(f"Kept only first episode of '{show['title']}' in Plex")
# Unmonitor in Sonarr
if sonarr.unmonitor_series(show['id'], show.get('guid')):
logging.info(f"Unmonitored show '{show['title']}' in Sonarr")
else: # keep
logging.info(f"Keeping show '{show['title']}' as is")
db.record_action(show['id'], action)
logging.info(f"Action '{action}' applied to show: {show['title']}")
else:
# Interactive: prompt user for each eligible show
for show in eligible_shows:
print(f"\nShow: {show['title']} ({show.get('year', '')})")
print("Choose action:")
print(" 1. delete")
print(" 2. keep_first_season")
print(" 3. keep_first_episode")
print(" 4. keep")
choice = input("Enter choice [1-4, default=4]: ").strip()
if choice == '1':
action = 'delete'
# Delete show in Plex
if plex.delete_show(show['id']):
logging.info(f"Deleted show '{show['title']}' from Plex")
# Unmonitor in Sonarr
if sonarr.unmonitor_series(show['id'], show.get('guid')):
logging.info(f"Unmonitored show '{show['title']}' in Sonarr")
elif choice == '2':
action = 'keep_first_season'
# Keep only first season in Plex
if plex.keep_first_season(show['id']):
logging.info(f"Kept only first season of '{show['title']}' in Plex")
# Unmonitor in Sonarr
if sonarr.unmonitor_series(show['id'], show.get('guid')):
logging.info(f"Unmonitored show '{show['title']}' in Sonarr")
elif choice == '3':
action = 'keep_first_episode'
# Keep only first episode in Plex
if plex.keep_first_episode(show['id']):
logging.info(f"Kept only first episode of '{show['title']}' in Plex")
# Unmonitor in Sonarr
if sonarr.unmonitor_series(show['id'], show.get('guid')):
logging.info(f"Unmonitored show '{show['title']}' in Sonarr")
else:
action = 'keep'
logging.info(f"Keeping show '{show['title']}' as is")
db.record_action(show['id'], action)
logging.info(f"Action '{action}' applied to show: {show['title']}")
logging.info("\nProcessing complete.")