-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
163 lines (133 loc) · 5.03 KB
/
utils.py
File metadata and controls
163 lines (133 loc) · 5.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import logging
import re
import time
from configparser import ConfigParser
import requests
from google.genai import Client
from lyricsgenius import Genius
from openai import OpenAI
config = ConfigParser()
config.read("config.ini")
gemini_client = Client(api_key=config["gemini"]["API_KEY"])
openai_client = OpenAI(api_key=config["openai"]["API_KEY"])
genius_client = Genius(access_token=config["genius"]["TOKEN"])
genius_client.verbose = False
WIKIDATA_API_URL = "https://www.wikidata.org/w/api.php"
INSTAGRAM_PROPERTY_ID = "P2003" # P2003 is the Wikidata property for Instagram username
def create_hashtag(text: str) -> str:
"""
Removes all non-alphanumeric characters from a string.
Args:
text: The input string.
Returns:
The string with only alphanumeric characters.
"""
hash_text = re.sub(r"[^a-zA-Z0-9]", "", text.lower())
return "#" + hash_text if hash_text else ""
def get_img_prompt(prompt: str) -> str:
response = gemini_client.models.generate_content(
model=config["gemini"]["MODEL"], contents=prompt
)
return response.text
def get_image(prompt: str) -> str | None:
"""Generate an image using OpenAI's DALL-E 3 model with a retry mechanism.
Args:
prompt: The text prompt for image generation.
Returns:
The URL of the generated image, or None if it fails after retries.
"""
attempts = 2
for i in range(attempts):
try:
img = openai_client.images.generate(
model=config["openai"]["MODEL"],
prompt=prompt,
n=1,
size="1024x1024",
quality="standard",
response_format="url",
)
print(f"dall-e revised prompt: {img.data[0].revised_prompt}")
url = img.data[0].url
print(f"Image URL: {url}")
return url
except Exception as e:
# Wait a second before retrying
if i < attempts - 1:
time.sleep(1)
else:
logging.error(str(e))
return None
def get_lyrics(song: str, artist: str) -> str | None:
obj = genius_client.search_song(song, artist)
if not obj:
song = song.split(" - ", 1)[0].strip()
obj = genius_client.search_song(song, artist)
if obj:
return (
obj.lyrics.split("Lyrics", 1)[1]
.split("You might also like")[0]
.split("… Read More")[-1]
.strip()
)
else:
return None
def get_handle(artist: str) -> str | None:
"""
Searches Wikidata for an artist and retrieves their Instagram username if available.
Args:
artist: The name of the artist to search for.
Returns:
The artist's Instagram username as a string, or None if not found.
"""
# The requests library automatically URL-encodes parameters
search_params = {
"action": "wbsearchentities",
"search": artist,
"language": "en",
"format": "json",
}
try:
# Wikidata API requires a User-Agent header to avoid 403 Forbidden errors
headers = {"User-Agent": "Slop/1.0 (https://github.com/mannmann2/slop)"}
response = requests.get(WIKIDATA_API_URL, params=search_params, headers=headers)
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
search_results = response.json()
except requests.exceptions.RequestException as e:
logging.error(f"Wikidata search request failed for '{artist}': {e}")
return None
except ValueError: # Catches JSON decoding errors
logging.error(f"Failed to decode JSON from Wikidata search for '{artist}'")
return None
if not search_results.get("search"):
logging.info(f"No Wikidata search results for artist: '{artist}'")
return None
qid = search_results["search"][0].get("id")
entity_params = {
"action": "wbgetentities",
"ids": qid,
"format": "json",
"props": "claims",
}
try:
headers = {"User-Agent": "Slop/1.0 (https://github.com/mannmann2/slop)"}
response = requests.get(WIKIDATA_API_URL, params=entity_params, headers=headers)
response.raise_for_status()
entity_data = response.json()
except requests.exceptions.RequestException as e:
logging.error(f"Wikidata entity request failed for QID '{qid}': {e}")
return None
except ValueError:
logging.error(
f"Failed to decode JSON from Wikidata entity request for QID '{qid}'"
)
return None
# Safely navigate the nested structure to find the Instagram username
try:
claims = entity_data["entities"][qid]["claims"]
instagram_claims = claims[INSTAGRAM_PROPERTY_ID]
return instagram_claims[0]["mainsnak"]["datavalue"]["value"]
except (KeyError, IndexError):
# This is not an error, just means the property doesn't exist for this entity.
logging.info(f"Could not find Instagram username for '{artist}' (QID: {qid}).")
return None