-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathutils.py
More file actions
350 lines (303 loc) · 14 KB
/
utils.py
File metadata and controls
350 lines (303 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
from datetime import datetime
import json
import os
import re
import requests
import imghdr
from mutagen.id3 import ID3, COMM, APIC, TPE1, TALB, TIT2, TDRC, TXXX, UFID, error as ID3Error
from mutagen import File, MutagenError
from mutagen.mp3 import MP3
from mutagen.flac import FLAC
from mutagen.oggvorbis import OggVorbis
from mutagen.m4a import M4A
from streamrip.db import Database, Downloads, Failed
from config import *
class DeezerAuthError(Exception):
"""Exception raised when Deezer authentication fails (e.g., invalid or stale ARL)."""
pass
def initialize_streamrip_db():
"""Initializes the streamrip database, ensuring tables exist."""
db_path = "/app/temp_downloads/downloads.db"
failed_db_path = "/app/temp_downloads/failed_downloads.db"
# Ensure the directory exists
os.makedirs(os.path.dirname(db_path), exist_ok=True)
print(f"Initializing streamrip database at {db_path}...")
try:
# Instantiating Downloads and Failed should create their tables if they don't exist
# based on streamrip's design. This ensures the schema is in place.
downloads_db = Downloads(db_path)
failed_downloads_db = Failed(failed_db_path)
Database(downloads=downloads_db, failed=failed_downloads_db)
print("Streamrip database initialization complete.")
except Exception as e:
print(f"Error initializing streamrip database: {e}")
# Re-raise the exception to make sure the program doesn't continue with a broken DB
raise
def get_last_playlist_name(playlist_history_file):
"""Retrieves the last playlist name from the history file."""
try:
with open(playlist_history_file, "r") as f:
return f.readline().strip()
except FileNotFoundError:
return None
def save_playlist_name(playlist_history_file, playlist_name):
"""Saves the playlist name to the history file."""
try:
with open(playlist_history_file, "w") as f:
f.write(playlist_name)
except OSError as e:
print(f"Error saving playlist name to file: {e}")
def sanitize_filename(filename):
"""Replaces problematic characters in filenames with underscores."""
return re.sub(r'[\\/:*?"<>|]', '_', filename)
def remove_empty_folders(path):
"""Removes empty folders from a given path."""
for root, dirs, files in os.walk(path, topdown=False):
for dir in dirs:
full_path = os.path.join(root, dir)
if not os.listdir(full_path):
try:
os.rmdir(full_path)
except OSError as e:
print(f"Error removing folder: {full_path}. Error: {e}")
class Tagger:
def __init__(self, album_recommendation_comment=None):
self.target_comment = TARGET_COMMENT
self.lastfm_target_comment = LASTFM_TARGET_COMMENT
self.album_recommendation_comment = album_recommendation_comment
def add_comment_to_file(self, file_path, comment):
"""Add a comment to a specific audio file."""
# Debug logging
debug_info = {
'file_path': file_path,
'comment': comment,
'timestamp': __import__('datetime').datetime.now().isoformat()
}
with open('/app/debug.log', 'a') as f:
f.write(f"ADD_COMMENT_START: {debug_info}\n")
try:
if file_path.lower().endswith('.mp3'):
# For MP3s, use ID3 tags
try:
audio = ID3(file_path)
except error.ID3NoHeaderError:
audio = ID3()
audio.add(COMM(encoding=3, lang='eng', desc='', text=comment))
audio.save(file_path, v2_version=3, v1=2)
print(f"Added ID3 comment to MP3 file: {file_path}")
elif file_path.lower().endswith('.flac'):
# For FLAC, use Vorbis comments
audio = FLAC(file_path)
audio['comment'] = comment
audio.save()
print(f"Added Vorbis comment to FLAC file: {file_path}")
elif file_path.lower().endswith(('.ogg', '.oga')):
# For OggVorbis, use Vorbis comments
audio = OggVorbis(file_path)
audio['comment'] = comment
audio.save()
print(f"Added Vorbis comment to OggVorbis file: {file_path}")
elif file_path.lower().endswith('.m4a'):
# For M4A, use iTunes-style atoms
audio = M4A(file_path)
audio['\xa9cmt'] = [comment]
audio.save()
print(f"Added iTunes comment to M4A file: {file_path}")
else:
print(f"Unsupported file type for adding comment: {file_path}")
return
except Exception as e:
print(f"Error adding comment to {file_path}: {e}")
import traceback
traceback.print_exc()
def _embed_album_art(self, file_path, album_art_url):
"""Downloads and embeds album art into the audio file."""
if not album_art_url:
print(f"No album art URL provided for {file_path}.")
return
try:
response = requests.get(album_art_url, stream=True)
response.raise_for_status()
image_data = response.content
image_type = imghdr.what(None, h=image_data)
if not image_type:
print(f"Could not determine image type for {album_art_url}. Skipping embedding.")
return
mime_type = f"image/{image_type}"
if file_path.lower().endswith('.mp3'):
audio = MP3(file_path, ID3=ID3)
audio.tags.add(
APIC(
encoding=3,
mime=mime_type,
type=3,
desc='Cover',
data=image_data
)
)
audio.save()
print(f"Embedded album art into MP3: {file_path}")
elif file_path.lower().endswith('.flac'):
audio = FLAC(file_path)
image = FLAC.Picture()
image.data = image_data
image.type = 3
image.mime = mime_type
audio.add_picture(image)
audio.save()
print(f"Embedded album art into FLAC: {file_path}")
elif file_path.lower().endswith(('.ogg', '.oga')):
audio = OggVorbis(file_path)
image = OggVorbis.Picture()
image.data = image_data
image.type = 3
image.mime = mime_type
audio.add_picture(image)
audio.save()
print(f"Embedded album art into OggVorbis: {file_path}")
elif file_path.lower().endswith('.m4a'):
audio = M4A(file_path)
image = M4A.Picture()
image.data = image_data
image.type = 3
image.mime = mime_type
audio.tags['covr'] = [image]
audio.save()
print(f"Embedded album art into M4A: {file_path}")
else:
print(f"Unsupported file type for album art embedding: {file_path}")
except requests.exceptions.RequestException as e:
print(f"Error downloading album art from {album_art_url}: {e}")
except Exception as e:
print(f"Error embedding album art into {file_path}: {e}")
def tag_track(self, file_path, artist, title, album, release_date, recording_mbid, source, album_art_url=None, is_album_recommendation=False):
"""Tags a track with metadata using Mutagen and embeds album art."""
# If title is not provided, try to extract it from the filename
if not title:
base_filename = os.path.splitext(os.path.basename(file_path))[0]
extracted_title = base_filename
# Remove artist name if present at the beginning (case-insensitive)
# This regex looks for "Artist - " at the start of the string
artist_pattern = re.compile(f"^{re.escape(artist)}\s*-\s*", re.IGNORECASE)
extracted_title = artist_pattern.sub("", extracted_title, 1)
# Remove common track number patterns like "01 - ", "01. "
extracted_title = re.sub(r"^\d+\s*-\s*", "", extracted_title) # "01 - Title"
extracted_title = re.sub(r"^\d+\.\s*", "", extracted_title) # "01. Title"
extracted_title = re.sub(r"^\(\d+\)\s*", "", extracted_title) # "(01) Title"
# Remove any trailing " - " or leading/trailing whitespace
extracted_title = extracted_title.strip(' -')
extracted_title = extracted_title.strip() # Final trim
# If after all removals, the title is empty, use the original base filename
if not extracted_title:
extracted_title = base_filename
title = extracted_title # Ensure title is set
if is_album_recommendation and self.album_recommendation_comment:
comment = self.album_recommendation_comment
else:
comment = self.target_comment if source == "ListenBrainz" else self.lastfm_target_comment
try:
audio = File(file_path)
if audio is None:
print(f"Could not open audio file with Mutagen: {file_path}")
return
if file_path.lower().endswith('.mp3'):
# For MP3s, use ID3 tags
if audio.tags is None:
audio.tags = ID3()
audio.tags.add(TPE1(encoding=3, text=[artist]))
audio.tags.add(TIT2(encoding=3, text=[title]))
audio.tags.add(TALB(encoding=3, text=[album]))
audio.tags.add(TDRC(encoding=3, text=[release_date]))
audio.tags.add(COMM(encoding=3, lang='eng', desc='', text=[comment]))
if recording_mbid:
# Using TXXX for custom text information
audio.tags.add(TXXX(encoding=3, desc='MUSICBRAINZ_RECORDINGID', text=[recording_mbid]))
# Also set UFID with the MusicBrainz URL
audio.tags.add(UFID(owner='http://musicbrainz.org', data=f'http://musicbrainz.org/recording/{recording_mbid}'.encode('utf-8')))
elif file_path.lower().endswith('.flac'):
# For FLAC, use Vorbis comments
audio['artist'] = artist
audio['title'] = title
audio['album'] = album
audio['date'] = release_date
audio['comment'] = comment
if recording_mbid:
audio['musicbrainz_recordingid'] = recording_mbid
elif file_path.lower().endswith(('.ogg', '.oga')):
# For OggVorbis, use Vorbis comments
audio['artist'] = artist
audio['title'] = title
audio['album'] = album
audio['date'] = release_date
audio['comment'] = comment
if recording_mbid:
audio['musicbrainz_recordingid'] = recording_mbid
elif file_path.lower().endswith('.m4a'):
# For M4A, use iTunes-style atoms
audio['\xa9ART'] = [artist]
audio['\xa9nam'] = [title]
audio['\xa9alb'] = [album]
audio['\xa9day'] = [release_date]
audio['\xa9cmt'] = [comment]
if recording_mbid:
# M4A does not have a standard tag for MusicBrainz ID, use a custom one
audio['----:com.apple.iTunes:MusicBrainz Recording Id'] = [recording_mbid.encode('utf-8')]
else:
print(f"Unsupported file type for tagging: {file_path}")
return
audio.save()
print(f"Successfully tagged {file_path} with Mutagen.")
if album_art_url:
self._embed_album_art(file_path, album_art_url)
except MutagenError as e:
print(f"Error tagging {file_path} with Mutagen: {e}")
except Exception as e:
print(f"An unexpected error occurred while tagging {file_path}: {e}")
def get_album_art(self, album_id, salt, token):
"""Fetches album art from Navidrome."""
url = f"{ROOT_ND}/rest/getCoverArt.view"
params = {
'u': USER_ND,
't': token,
's': salt,
'v': '1.16.1',
'c': 'python-script',
'id': album_id,
'size': 1200
}
try:
response = requests.get(url, params=params, stream=True)
response.raise_for_status()
return response.content
except requests.exceptions.RequestException as e:
print(f"Error fetching album art: {e}")
return None
def update_status_file(download_id, status, message=None, title=None, current_track_count=None, total_track_count=None):
if not download_id:
return
status_dir = "/tmp/recommand_download_status"
os.makedirs(status_dir, exist_ok=True)
status_file_path = os.path.join(status_dir, f"{download_id}.json")
status_data = {
"status": status,
"timestamp": datetime.now().isoformat()
}
if message:
status_data["message"] = message
if title:
status_data["title"] = title
else:
# Default title based on status
if status == "completed":
status_data["title"] = "Download completed"
elif status == "failed":
status_data["title"] = "Download failed"
elif status == "in_progress":
status_data["title"] = "Download in progress"
if current_track_count is not None:
status_data["current_track_count"] = current_track_count
if total_track_count is not None:
status_data["total_track_count"] = total_track_count
with open(status_file_path, 'w') as f:
json.dump(status_data, f)
print(f"Updated status file for {download_id}: {status}")