-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
227 lines (185 loc) · 7.4 KB
/
main.py
File metadata and controls
227 lines (185 loc) · 7.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import json
import logging
import time
from configparser import ConfigParser
import pandas as pd
import requests
from constants import CAPTION, IMG_PROMPT
from instagram import Instagram
from spotify import Spotify
from utils import create_hashtag, get_handle, get_image, get_img_prompt, get_lyrics
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
)
config = ConfigParser()
config.read("config.ini")
IG_ID = config["instagram"]["IG_ID"]
INSTA_TOKEN = config["instagram"]["TOKEN"]
CLIENT_ID = config["spotify"]["CLIENT_ID"]
CLIENT_SECRET = config["spotify"]["CLIENT_SECRET"]
CDN_BASE = "http://192.168.0.5:8000"
CDN_PASSWORD = config["cdn"]["PASSWORD"]
_cdn_token: str | None = None
_cdn_token_expiry: float = 0.0
def _get_cdn_token() -> str:
global _cdn_token, _cdn_token_expiry
if _cdn_token and time.time() < _cdn_token_expiry:
return _cdn_token
resp = requests.post(
f"{CDN_BASE}/api/auth/login",
json={"password": CDN_PASSWORD},
timeout=10,
)
resp.raise_for_status()
_cdn_token = resp.json()["token"]
_cdn_token_expiry = time.time() + 29 * 24 * 3600 # refresh a day early
return _cdn_token
def upload_to_cdn(image_url: str) -> None:
token = _get_cdn_token()
img_bytes = requests.get(image_url, timeout=30).content
resp = requests.post(
f"{CDN_BASE}/api/cdn/upload",
headers={"Authorization": f"Bearer {token}"},
files={"files": ("photo.jpg", img_bytes, "image/jpeg")},
timeout=30,
)
resp.raise_for_status()
logging.info(f"CDN upload response: {resp.json()}")
ENABLE_GENAI = True
GEN_IMG_URL = None
# GEN_IMG_URL = ""
with open("user.json", "r") as f:
user = json.load(f)
spot = Spotify(client_id=CLIENT_ID, client_secret=CLIENT_SECRET, user=user)
insta = Instagram(IG_ID, INSTA_TOKEN)
while True:
try:
current = spot.get_currently_playing()
if current:
with open("current.json", "w") as f:
json.dump(current, f)
else:
logging.info("Nothing is playing...")
time.sleep(120)
continue
track = current["item"]["name"]
artist = current["item"]["artists"][0]["name"]
album = current["item"]["album"]["name"]
album_id = current["item"]["album"]["id"]
logging.info(f"Currently playing: `{track}` by `{artist}` / `{album}`")
df = pd.read_csv("archive.tsv", sep="\t")
# if artwork not in last 30 played items
# recent_album_ids = df["album_id"].drop_duplicates(keep="last").tail(30x).values
recent_album_ids = df.tail(30)["album_id"].values
if (
album_id not in recent_album_ids
and current["context"]["type"] == "album"
and current["context"]["uri"] == current["item"]["album"]["uri"]
):
logging.info("Preparing new post...")
gen_img_url = None
image_url = current["item"]["album"]["images"][0]["url"]
lyrics_str = get_lyrics(track, artist) or ""
# gen img flow
if lyrics_str:
if GEN_IMG_URL:
gen_img_url = GEN_IMG_URL
img_prompt = ""
elif ENABLE_GENAI:
# get artist genres
artist_obj = spot.get_artists(
ids=[current["item"]["artists"][0]["id"]]
)
genres = ", ".join(artist_obj[0]["genres"])
if genres:
logging.info(f"Genres: {genres}")
genres = f"`{genres}` "
prompt = IMG_PROMPT.format(
song=track,
artist=artist,
lyrics=lyrics_str,
genres=genres,
)
img_prompt = get_img_prompt(prompt)
print()
print(img_prompt)
gen_img_url = get_image(img_prompt)
# gen_img_url = get_image("DO NOT CHANGE THE FOLLOWING PROMPT:\n" + img_prompt)
lyrics_str += "\n\n"
else:
logging.info("No lyrics found...")
# get handle and hashtags
handle = get_handle(artist)
if handle:
hash_items = current["item"]["artists"] + [current["item"]["album"]]
artist_tag = "@" + handle
else:
hash_items = current["item"]["artists"][1:] + [current["item"]["album"]]
artist_tag = create_hashtag(artist)
hashtags = " ".join(set(create_hashtag(x["name"]) for x in hash_items))
# prepare caption
caption = CAPTION.format(
track=track,
artist=artist,
lyrics=lyrics_str,
handle=artist_tag,
hashtags=hashtags,
)
if len(caption) > 2200:
caption = CAPTION.format(
track=track,
artist=artist,
lyrics="",
handle=artist_tag,
hashtags=hashtags,
)
# upload content
if gen_img_url:
# update archive
new_row = {
"track": track,
"album": album,
"artist": artist,
"track_id": current["item"]["id"],
"album_id": album_id,
}
last_row = df.iloc[-1] if not df.empty else None
if last_row is None or any(
last_row[col] != new_row[col] for col in new_row
):
df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
df.to_csv("archive.tsv", index=False, sep="\t")
# upload generated image to CDN before posting
try:
upload_to_cdn(gen_img_url)
except Exception as cdn_err:
logging.warning(
f"CDN upload failed (continuing with Instagram post): {cdn_err}"
)
# add img prompt to alt text if within char limit
alt_text = img_prompt if len(img_prompt) <= 1000 else None
img1 = insta.create_container(
url=gen_img_url,
is_carousel=True,
user_tag=handle,
alt_text=alt_text,
)
logging.info(f"Created gen img container: {img1}")
img2 = insta.create_container(url=image_url, is_carousel=True)
logging.info(f"Created album container: {img2}")
res = insta.create_carousel(
caption=caption, children=[img1["id"], img2["id"]], user_tag=handle
)
logging.info(f"Created carousel: {res}")
# else:
# res = insta.create_container(
# url=image_url, caption=caption, user_tag=handle
# )
# logging.info(f"Created container: {res}")
# publish content
time.sleep(10)
res = insta.post_media(res["id"])
logging.info(f"Posted media: {res}")
except Exception as e:
print("Error:", str(e))
time.sleep(90)