forked from keksiqc/ctrld-sync
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
372 lines (297 loc) · 13.3 KB
/
main.py
File metadata and controls
372 lines (297 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
#!/usr/bin/env python3
"""
Control D Sync
----------------------
A tiny helper that keeps your Control D folders in sync with a set of
remote block-lists.
It does three things:
1. Reads the folder names from the JSON files.
2. Deletes any existing folders with those names (so we start fresh).
3. Re-creates the folders and pushes all rules in batches.
Nothing fancy, just works.
"""
import os
import logging
import time
from typing import Dict, List, Optional, Any, Set
import httpx
from dotenv import load_dotenv
# --------------------------------------------------------------------------- #
# 0. Bootstrap – load secrets and configure logging
# --------------------------------------------------------------------------- #
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%H:%M:%S",
)
logging.getLogger("httpx").setLevel(logging.WARNING)
log = logging.getLogger("control-d-sync")
# --------------------------------------------------------------------------- #
# 1. Constants – tweak only here
# --------------------------------------------------------------------------- #
API_BASE = "https://api.controld.com/profiles"
TOKEN = os.getenv("TOKEN")
# Accept either a single profile id or a comma-separated list
PROFILE_IDS = [p.strip() for p in os.getenv("PROFILE", "").split(",") if p.strip()]
# URLs of the JSON block-lists we want to import
FOLDER_URLS = [
"https://raw.githubusercontent.com/hagezi/dns-blocklists/main/controld/native-tracker-tiktok-aggressive-folder.json",
"https://raw.githubusercontent.com/hagezi/dns-blocklists/refs/heads/main/controld/native-tracker-xiaomi-folder.json",
"https://raw.githubusercontent.com/hagezi/dns-blocklists/refs/heads/main/controld/nosafesearch-folder.json",
]
BATCH_SIZE = 500
MAX_RETRIES = 3
RETRY_DELAY = 1 # seconds
FOLDER_CREATION_DELAY = 2 # seconds to wait after creating a folder
# --------------------------------------------------------------------------- #
# 2. Clients
# --------------------------------------------------------------------------- #
# Control-D API client (with auth)
_api = httpx.Client(
headers={
"Accept": "application/json",
"Authorization": f"Bearer {TOKEN}",
},
timeout=30,
)
# GitHub raw client (no auth, no headers)
_gh = httpx.Client(timeout=30)
# --------------------------------------------------------------------------- #
# 3. Helpers
# --------------------------------------------------------------------------- #
# simple in-memory cache: url -> decoded JSON
_cache: Dict[str, Dict] = {}
def _api_get(url: str) -> httpx.Response:
"""GET helper for Control-D API with retries."""
return _retry_request(lambda: _api.get(url))
def _api_delete(url: str) -> httpx.Response:
"""DELETE helper for Control-D API with retries."""
return _retry_request(lambda: _api.delete(url))
def _api_post(url: str, data: Dict) -> httpx.Response:
"""POST helper for Control-D API with retries."""
return _retry_request(lambda: _api.post(url, data=data))
def _api_post_form(url: str, data: Dict) -> httpx.Response:
"""POST helper for form data with retries."""
return _retry_request(lambda: _api.post(url, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"}))
def _retry_request(request_func, max_retries=MAX_RETRIES, delay=RETRY_DELAY):
"""Retry a request function with exponential backoff."""
for attempt in range(max_retries):
try:
response = request_func()
response.raise_for_status()
return response
except (httpx.HTTPError, httpx.TimeoutException) as e:
if attempt == max_retries - 1:
# Log the response content if available
if hasattr(e, 'response') and e.response is not None:
log.error(f"Response content: {e.response.text}")
raise
wait_time = delay * (2 ** attempt)
log.warning(f"Request failed (attempt {attempt + 1}/{max_retries}): {e}. Retrying in {wait_time}s...")
time.sleep(wait_time)
def _gh_get(url: str) -> Dict:
"""Fetch JSON from GitHub (cached)."""
if url not in _cache:
r = _gh.get(url)
r.raise_for_status()
_cache[url] = r.json()
return _cache[url]
def list_existing_folders(profile_id: str) -> Dict[str, str]:
"""Return folder-name -> folder-id mapping."""
try:
data = _api_get(f"{API_BASE}/{profile_id}/groups").json()
folders = data.get("body", {}).get("groups", [])
return {
f["group"].strip(): f["PK"]
for f in folders
if f.get("group") and f.get("PK")
}
except (httpx.HTTPError, KeyError) as e:
log.error(f"Failed to list existing folders: {e}")
return {}
def get_all_existing_rules(profile_id: str) -> Set[str]:
"""Get all existing rules from all folders in the profile."""
all_rules = set()
try:
# Get rules from root folder (no folder_id)
try:
data = _api_get(f"{API_BASE}/{profile_id}/rules").json()
root_rules = data.get("body", {}).get("rules", [])
for rule in root_rules:
if rule.get("PK"):
all_rules.add(rule["PK"])
log.debug(f"Found {len(root_rules)} rules in root folder")
except httpx.HTTPError as e:
log.warning(f"Failed to get root folder rules: {e}")
# Get all folders (including ones we're not managing)
folders = list_existing_folders(profile_id)
# Get rules from each folder
for folder_name, folder_id in folders.items():
try:
data = _api_get(f"{API_BASE}/{profile_id}/rules/{folder_id}").json()
folder_rules = data.get("body", {}).get("rules", [])
for rule in folder_rules:
if rule.get("PK"):
all_rules.add(rule["PK"])
log.debug(f"Found {len(folder_rules)} rules in folder '{folder_name}'")
except httpx.HTTPError as e:
log.warning(f"Failed to get rules from folder '{folder_name}': {e}")
continue
log.info(f"Total existing rules across all folders: {len(all_rules)}")
return all_rules
except Exception as e:
log.error(f"Failed to get existing rules: {e}")
return set()
def fetch_folder_data(url: str) -> Dict[str, Any]:
"""Return folder data from GitHub JSON."""
js = _gh_get(url)
return js
def delete_folder(profile_id: str, name: str, folder_id: str) -> bool:
"""Delete a single folder by its ID. Returns True if successful."""
try:
_api_delete(f"{API_BASE}/{profile_id}/groups/{folder_id}")
log.info("Deleted folder '%s' (ID %s)", name, folder_id)
return True
except httpx.HTTPError as e:
log.error(f"Failed to delete folder '{name}' (ID {folder_id}): {e}")
return False
def create_folder(profile_id: str, name: str, do: int, status: int) -> Optional[str]:
"""
Create a new folder and return its ID.
The API returns the full list of groups, so we look for the one we just added.
"""
try:
_api_post(
f"{API_BASE}/{profile_id}/groups",
data={"name": name, "do": do, "status": status},
)
# Re-fetch the list and pick the folder we just created
data = _api_get(f"{API_BASE}/{profile_id}/groups").json()
for grp in data["body"]["groups"]:
if grp["group"].strip() == name.strip():
log.info("Created folder '%s' (ID %s)", name, grp["PK"])
time.sleep(FOLDER_CREATION_DELAY)
return str(grp["PK"])
log.error(f"Folder '{name}' was not found after creation")
return None
except (httpx.HTTPError, KeyError) as e:
log.error(f"Failed to create folder '{name}': {e}")
return None
def push_rules(
profile_id: str,
folder_name: str,
folder_id: str,
do: int,
status: int,
hostnames: List[str],
existing_rules: Set[str],
) -> bool:
"""Push hostnames in batches to the given folder, skipping duplicates. Returns True if successful."""
if not hostnames:
log.info("Folder '%s' - no rules to push", folder_name)
return True
# Filter out duplicates
original_count = len(hostnames)
filtered_hostnames = [h for h in hostnames if h not in existing_rules]
duplicates_count = original_count - len(filtered_hostnames)
if duplicates_count > 0:
log.info(f"Folder '{folder_name}': skipping {duplicates_count} duplicate rules")
if not filtered_hostnames:
log.info(f"Folder '{folder_name}' - no new rules to push after filtering duplicates")
return True
successful_batches = 0
total_batches = len(range(0, len(filtered_hostnames), BATCH_SIZE))
for i, start in enumerate(range(0, len(filtered_hostnames), BATCH_SIZE), 1):
batch = filtered_hostnames[start : start + BATCH_SIZE]
data = {
"do": str(do),
"status": str(status),
"group": str(folder_id),
}
for j, hostname in enumerate(batch):
data[f"hostnames[{j}]"] = hostname
try:
_api_post_form(
f"{API_BASE}/{profile_id}/rules",
data=data,
)
log.info(
"Folder '%s' – batch %d: added %d rules",
folder_name,
i,
len(batch),
)
successful_batches += 1
# Update existing_rules set with the newly added rules
existing_rules.update(batch)
except httpx.HTTPError as e:
log.error(f"Failed to push batch {i} for folder '{folder_name}': {e}")
if hasattr(e, 'response') and e.response is not None:
log.error(f"Response content: {e.response.text}")
if successful_batches == total_batches:
log.info("Folder '%s' – finished (%d new rules added)", folder_name, len(filtered_hostnames))
return True
else:
log.error(f"Folder '%s' – only {successful_batches}/{total_batches} batches succeeded")
return False
# --------------------------------------------------------------------------- #
# 4. Main workflow
# --------------------------------------------------------------------------- #
def sync_profile(profile_id: str) -> bool:
"""One-shot sync: delete old, create new, push rules. Returns True if successful."""
try:
# Fetch all folder data first
folder_data_list = []
for url in FOLDER_URLS:
try:
folder_data_list.append(fetch_folder_data(url))
except (httpx.HTTPError, KeyError) as e:
log.error(f"Failed to fetch folder data from {url}: {e}")
continue
if not folder_data_list:
log.error("No valid folder data found")
return False
# Get existing folders and delete target folders
existing_folders = list_existing_folders(profile_id)
for folder_data in folder_data_list:
name = folder_data["group"]["group"].strip()
if name in existing_folders:
delete_folder(profile_id, name, existing_folders[name])
# Get all existing rules AFTER deleting target folders
existing_rules = get_all_existing_rules(profile_id)
# Create new folders and push rules
success_count = 0
for folder_data in folder_data_list:
grp = folder_data["group"]
name = grp["group"].strip()
do = grp["action"]["do"]
status = grp["action"]["status"]
hostnames = [r["PK"] for r in folder_data.get("rules", []) if r.get("PK")]
folder_id = create_folder(profile_id, name, do, status)
if folder_id and push_rules(profile_id, name, folder_id, do, status, hostnames, existing_rules):
success_count += 1
# Note: existing_rules is updated within push_rules function
# Optional: Refresh existing rules after each folder (more thorough but slower)
# existing_rules = get_all_existing_rules(profile_id)
log.info(f"Sync complete: {success_count}/{len(folder_data_list)} folders processed successfully")
return success_count == len(folder_data_list)
except Exception as e:
log.error(f"Unexpected error during sync for profile {profile_id}: {e}")
return False
# --------------------------------------------------------------------------- #
# 5. Entry-point
# --------------------------------------------------------------------------- #
def main():
if not TOKEN or not PROFILE_IDS:
log.error("TOKEN and/or PROFILE missing - check your .env file")
exit(1)
success_count = 0
for profile_id in PROFILE_IDS:
log.info("Starting sync for profile %s", profile_id)
if sync_profile(profile_id):
success_count += 1
log.info(f"All profiles processed: {success_count}/{len(PROFILE_IDS)} successful")
exit(0 if success_count == len(PROFILE_IDS) else 1)
if __name__ == "__main__":
main()