-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdatabase.py
More file actions
137 lines (124 loc) · 4.77 KB
/
database.py
File metadata and controls
137 lines (124 loc) · 4.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# Handles SQLite database management
import sqlite3
import os
import time
import shutil
import logging
class DatabaseManager:
def __init__(self, db_path):
self.db_path = db_path
self.conn = None
logging.debug(f"DatabaseManager initialized with db_path={db_path}")
def setup(self):
self.conn = sqlite3.connect(self.db_path)
c = self.conn.cursor()
c.execute("PRAGMA journal_mode=WAL;")
# Create tables if not exist
c.execute('''CREATE TABLE IF NOT EXISTS shows (
id TEXT PRIMARY KEY,
title TEXT,
last_modified INTEGER,
last_processed INTEGER,
tvdb_id TEXT,
disk_space_bytes INTEGER,
disk_space_formatted TEXT,
action TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS actions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
show_id TEXT,
action TEXT,
timestamp INTEGER
)''')
# Add disk space columns if they don't exist
try:
# Check if disk_space_bytes column exists
c.execute("SELECT disk_space_bytes FROM shows LIMIT 1")
except sqlite3.OperationalError:
logging.info("Adding disk space columns to shows table")
c.execute("ALTER TABLE shows ADD COLUMN disk_space_bytes INTEGER")
c.execute("ALTER TABLE shows ADD COLUMN disk_space_formatted TEXT")
self.conn.commit()
logging.debug("Database tables ensured.")
def record_action(self, show_id, action):
"""
Record an action taken for a show
"""
try:
c = self.conn.cursor()
now = int(time.time())
c.execute('UPDATE shows SET action = ?, last_modified = ? WHERE id = ?',
(action, now, show_id))
self.conn.commit()
return True
except Exception as e:
logging.error(f"Error recording action {action} for show {show_id}: {e}")
return False
def save_tvdb_id(self, show_id, tvdb_id):
"""
Store the TVDB ID for a show
"""
try:
c = self.conn.cursor()
c.execute('UPDATE shows SET tvdb_id = ? WHERE id = ?', (tvdb_id, show_id))
# If show doesn't exist yet, insert it
if c.rowcount == 0:
c.execute('INSERT OR IGNORE INTO shows (id, tvdb_id) VALUES (?, ?)', (show_id, tvdb_id))
self.conn.commit()
return True
except Exception as e:
logging.error(f"Error saving TVDB ID {tvdb_id} for show {show_id}: {e}")
return False
def get_tvdb_id(self, show_id):
"""
Retrieve the TVDB ID for a show
"""
try:
c = self.conn.cursor()
c.execute('SELECT tvdb_id FROM shows WHERE id = ?', (show_id,))
result = c.fetchone()
return result[0] if result else None
except Exception as e:
logging.error(f"Error retrieving TVDB ID for show {show_id}: {e}")
return None
def save_disk_space(self, show_id, disk_space_bytes, disk_space_formatted):
"""
Store the disk space information for a show
"""
try:
c = self.conn.cursor()
c.execute('UPDATE shows SET disk_space_bytes = ?, disk_space_formatted = ? WHERE id = ?',
(disk_space_bytes, disk_space_formatted, show_id))
# If show doesn't exist yet, don't insert (should never happen as this is called after show record exists)
self.conn.commit()
return True
except Exception as e:
logging.error(f"Error saving disk space info for show {show_id}: {e}")
return False
def repair(self):
# Simple integrity check
try:
c = self.conn.cursor()
c.execute('PRAGMA integrity_check')
result = c.fetchone()
if result[0] != 'ok':
logging.error('DB integrity check failed')
raise Exception('DB integrity check failed')
except Exception as e:
logging.error(f"DB repair failed: {e}")
return False
logging.debug("Database integrity check passed.")
return True
def backup(self, backup_path):
self.conn.commit()
shutil.copy2(self.db_path, backup_path)
logging.info(f"Database backed up to {backup_path}.")
def restore(self, backup_path):
self.conn.close()
shutil.copy2(backup_path, self.db_path)
self.conn = sqlite3.connect(self.db_path)
logging.info(f"Database restored from {backup_path}.")
def close(self):
if self.conn:
self.conn.close()
logging.debug("Database connection closed.")