-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
304 lines (256 loc) · 11 KB
/
main.py
File metadata and controls
304 lines (256 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
from dbus import SessionBus
import dbus
import os
import time
import json
from dotenv import load_dotenv
from pypresence import Presence
import pypresence
import pympris
from minio import Minio
from minio.error import S3Error
from mimetypes import guess_type
import magic
import requests
from urllib.parse import urlparse
from io import BytesIO
from PIL import Image
import io
import hashlib
# Obtient le chemin d'accès au dossier du script actuel
script_dir = os.path.dirname(os.path.abspath(__file__))
# Construit le chemin d'accès au fichier .env basé sur le dossier du script
env_path = os.path.join(script_dir, '.env')
# Charge les variables d'environnement depuis le fichier .env spécifié
load_dotenv(env_path)
# Discord RPC and AWS S3/MinIO configuration :
MINIO_URL = os.getenv('MINIO_URL')
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY')
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY')
DISCORD_CLIENT_ID = os.getenv('DISCORD_CLIENT_ID') # Assurez-vous que ceci est votre propre Client ID Discord
# Construit le chemin d'accès au fichier de cache d'image basé sur le dossier du script
CACHE_PATH = os.path.join(script_dir, 'image_cache.json') # Chemin vers un fichier de cache pour les images téléchargées
ICON_NAMES = {
'Clementine': 'clementine',
'Media Player Classic Qute Theater': 'mpc-qt',
'mpv': 'mpv',
'Music Player Daemon': 'mpd',
'VLC media player': 'vlc',
'SMPlayer': 'smplayer',
'Lollypop': 'lollypop',
'Mozilla Firefox': 'firefox',
'MellowPlayer': 'mellowplayer',
'Chrome': 'chrome',
'Spotify': 'spotify',
'Spotube': 'spotube',
'Strawberry': 'strawberry',
'default_icon': 'default_icon',
}
RPC = Presence(DISCORD_CLIENT_ID, response_timeout=15)
RPC.connect()
def get_icon_by_name(icon_name):
# Retourne le nom de l'icône basé sur le nom de l'application
return ICON_NAMES.get(icon_name)
def get_mpris_players():
session_bus = SessionBus()
return [service for service in session_bus.list_names() if service.startswith('org.mpris.MediaPlayer2.')]
def get_active_player():
players = get_mpris_players()
if not players:
print("Aucun lecteur trouvé.")
return None, None
player_id = players[0] # Exemple : Prend le premier lecteur trouvé
bus = dbus.SessionBus()
players_ids = list(pympris.available_players())
mp = pympris.MediaPlayer(players_ids[0], bus)
player_name = mp.root.Identity
player_bus = SessionBus().get_object(player_id, "/org/mpris/MediaPlayer2")
print(player_name)
return dbus.Interface(player_bus, "org.freedesktop.DBus.Properties"), player_name
def get_current_track_info(player_properties):
metadata = player_properties.Get("org.mpris.MediaPlayer2.Player", "Metadata")
title = metadata.get('xesam:title', "Titre inconnu")
artist = metadata.get('xesam:artist', ["Artiste inconnu"])[0]
art_url = metadata.get('mpris:artUrl', "")
return title, artist, art_url
# Récupération
def get_json(f="image_cache", encoding="utf-8"):
try:
with open(f"{f}.json", "r", encoding=encoding) as file:
return json.load(file)
except FileNotFoundError:
return {}
# Mise à Jour
def update_json(data, f="image_cache", encoding="utf-8"):
with open(f"{f}.json", "w", encoding=encoding) as file:
json.dump(data, file, indent=4)
def is_url(path):
try:
result = urlparse(path)
return all([result.scheme, result.netloc])
except ValueError:
return False
def generate_image_hash(url):
"""Génère un hash MD5 de l'URL pour servir d'identifiant unique."""
return hashlib.md5(url.encode('utf-8')).hexdigest()
def get_cached_url(image_hash):
"""Récupère l'URL depuis le cache basé sur le hash de l'image."""
data = get_json("image_cache")
return data.get(image_hash)
def update_cache(image_hash, object_url):
"""Mise à jour du cache avec la nouvelle URL de l'image."""
data = get_json("image_cache")
data[image_hash] = object_url
update_json(data, "image_cache")
def upload_file_to_minio(bucket_name, file_path, object_name=None):
# Configuration du client MinIO
client = Minio(
MINIO_URL,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=True
)
if file_path.startswith('file://'):
image_path_str = file_path[7:]
else:
image_path_str = file_path
image_hash = generate_image_hash(image_path_str)
cached_url = get_cached_url(image_hash)
if cached_url:
print(f"URL depuis le cache pour {image_path_str}: {cached_url}")
return cached_url
if is_url(image_path_str):
# Téléchargement de l'image depuis l'URL
response = requests.get(image_path_str)
if response.status_code != 200:
print(f"Erreur lors du téléchargement de l'image: {response.status_code}")
return None
# Conversion de l'image en PNG
image = Image.open(io.BytesIO(response.content))
image_converted = image.convert("RGBA")
png_image_io = io.BytesIO()
image_converted.save(png_image_io, format='PNG')
png_image_io.seek(0)
# Détermination du type MIME
mime_type = "image/png"
extension = ".png"
# Définition du nom de l'objet avec l'extension PNG
object_name_with_extension = (object_name or image_path_str.split('/')[-1].split('.')[0]) + extension
# Détermination de la taille du fichier PNG
file_data = png_image_io
file_size = png_image_io.getbuffer().nbytes
try:
# Upload de l'image convertie en PNG dans le bucket MinIO
client.put_object(
bucket_name,
object_name_with_extension,
file_data,
file_size,
content_type=mime_type,
)
print(f"Fichier {object_name_with_extension} uploadé avec succès.")
# Construction de l'URL de l'image uploadée
scheme = "https://"
object_url = f"{scheme}{MINIO_URL}/{bucket_name}/{object_name_with_extension}"
update_cache(image_hash, object_url)
return object_url
except S3Error as exc:
print("Erreur lors de l'upload:", exc)
return None
else:
# Lecture du cache
data = get_json("image_cache")
# Vérification du cache
cached_url = data.get(image_path_str)
if cached_url:
print(f"URL depuis le cache pour {image_path_str}: {cached_url}")
return cached_url
if object_name is None:
object_name = os.path.basename(image_path_str)
mime_type = magic.from_file(image_path_str, mime=True)
try:
# Utilisez python-magic pour deviner le type MIME basé sur le contenu du fichier
client.fput_object(
bucket_name, object_name, image_path_str,
content_type=mime_type, # Utilisez le type MIME déterminé par python-magic
)
print(f"Fichier {image_path_str} uploadé comme {object_name} avec le type MIME {mime_type}.")
# Construire l'URL de l'objet
scheme = "https://"
object_url = f"{scheme}{MINIO_URL}/{bucket_name}/{object_name}"
# Mise à jour du cache uniquement si l'upload réussit
data[image_path_str] = object_url
update_json(data, "image_cache")
return object_url
except S3Error as exc:
print("Erreur lors de l'upload:", exc)
return None
def format_time(microseconds):
# Convertit les microsecondes en minutes et secondes
seconds = int(microseconds / 1000000) # Convertit en secondes
minutes = seconds // 60
seconds = seconds % 60
return f"{minutes:02}:{seconds:02}"
def update_discord_presence(title, artist, position, duration, image_link, player_name):
current_time = time.time() # Temps actuel en secondes
start_timestamp = current_time - (position / 1000000) # Convertit position en secondes et soustrait de l'heure actuelle
end_timestamp = start_timestamp + (duration / 1000000)
# Utilisez une icône par défaut si le lecteur n'est pas dans la liste
icon_name = get_icon_by_name(player_name) or get_icon_by_name("default_icon")
large_image = image_link or icon_name
try:
# Met à jour la présence Discord avec une barre de progression
RPC.update(
details=title,
state=f"par {artist}",
start=start_timestamp,
end=end_timestamp,
large_image=large_image,
small_image=icon_name,
large_text="Écoute en cours",
small_text=player_name
)
except pypresence.exceptions.ResponseTimeout:
print("Timeout en attente de réponse de Discord. Tentative de reconnexion...")
def get_player_properties():
session_bus = SessionBus()
for service in session_bus.list_names():
if service.startswith('org.mpris.MediaPlayer2.'):
try:
player = session_bus.get_object(service, '/org/mpris/MediaPlayer2')
properties_manager = dbus.Interface(player, dbus_interface='org.freedesktop.DBus.Properties')
metadata = properties_manager.Get('org.mpris.MediaPlayer2.Player', 'Metadata')
position = properties_manager.Get('org.mpris.MediaPlayer2.Player', 'Position')
duration = metadata.get('mpris:length', 0) # Fournit une valeur par défaut de 0 si non trouvé
return position, duration
except dbus.DBusException as e:
print(f"Erreur DBus lors de la récupération des propriétés du lecteur : {e}")
continue # Continue à essayer avec les autres services si l'un échoue
return None, None
def main():
while True:
player_properties, player_name = get_active_player()
if player_properties:
title, artist, art_url = get_current_track_info(player_properties)
position, duration = get_player_properties() # Assurez-vous que cette fonction retourne ces deux valeurs
if title and artist:
image_link = handle_image_caching_and_upload(art_url) if art_url else None
update_discord_presence(title, artist, position, duration, image_link, player_name)
else:
clear_discord_presence()
else:
print("Aucun lecteur trouvé.")
clear_discord_presence()
time.sleep(5)
def clear_discord_presence():
try:
RPC.clear() # Efface la présence sur Discord
print("Présence Discord effacée.")
except Exception as e:
print(f"Erreur lors de l'effacement de la présence Discord: {e}")
def handle_image_caching_and_upload(art_url):
# Logique de cache et d'upload d'image comme montré précédemment
# Retourne le lien de l'image uploadée ou le lien du cache
return upload_file_to_minio('coversimage', art_url)
if __name__ == "__main__":
main()