forked from cnwikee/CheckTMDB
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcheck_tmdb_github_dnschecked.py
More file actions
327 lines (280 loc) · 13 KB
/
check_tmdb_github_dnschecked.py
File metadata and controls
327 lines (280 loc) · 13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
import requests
from time import sleep
import random
import time
import os
import sys
import re # 导入正则模块用于IP验证
from datetime import datetime, timezone, timedelta
from retry import retry
import socket
DOMAINS = [
'tmdb.org',
'api.tmdb.org',
'files.tmdb.org',
'themoviedb.org',
'api.themoviedb.org',
'www.themoviedb.org',
'auth.themoviedb.org',
'image.tmdb.org',
'images.tmdb.org',
'imdb.com',
'www.imdb.com',
'secure.imdb.com',
's.media-imdb.com',
'us.dd.imdb.com',
'www.imdb.to',
'origin-www.imdb.com',
'ia.media-imdb.com',
'thetvdb.com',
'api.thetvdb.com',
'ia.media-imdb.com',
'f.media-amazon.com',
'imdb-video.media-imdb.com'
]
Tmdb_Host_TEMPLATE = """# Tmdb Hosts Start
{content}
# Update time: {update_time}
# IPv4 Update url: https://raw.githubusercontent.com/cnwikee/CheckTMDB/refs/heads/main/Tmdb_host_ipv4
# IPv6 Update url: https://raw.githubusercontent.com/cnwikee/CheckTMDB/refs/heads/main/Tmdb_host_ipv6
# Star me: https://github.com/cnwikee/CheckTMDB
# Tmdb Hosts End\n"""
def write_file(ipv4_hosts_content: str, ipv6_hosts_content: str, update_time: str) -> bool:
output_doc_file_path = os.path.join(os.path.dirname(__file__), "README.md")
template_path = os.path.join(os.path.dirname(__file__), "README_template.md")
if os.path.exists(output_doc_file_path):
with open(output_doc_file_path, "r", encoding='utf-8') as old_readme_md:
old_readme_md_content = old_readme_md.read()
if old_readme_md_content:
old_ipv4_block = old_readme_md_content.split("```bash")[1].split("```")[0].strip()
old_ipv4_hosts = old_ipv4_block.split("# Update time:")[0].strip()
old_ipv6_block = old_readme_md_content.split("```bash")[2].split("```")[0].strip()
old_ipv6_hosts = old_ipv6_block.split("# Update time:")[0].strip()
if ipv4_hosts_content != "":
new_ipv4_hosts = ipv4_hosts_content.split("# Update time:")[0].strip()
if old_ipv4_hosts == new_ipv4_hosts:
print("ipv4 host not change")
w_ipv4_block = old_ipv4_block
else:
w_ipv4_block = ipv4_hosts_content
write_host_file(ipv4_hosts_content, 'ipv4')
else:
print("ipv4_hosts_content is null")
w_ipv4_block = old_ipv4_block
if ipv6_hosts_content != "":
new_ipv6_hosts = ipv6_hosts_content.split("# Update time:")[0].strip()
if old_ipv6_hosts == new_ipv6_hosts:
print("ipv6 host not change")
w_ipv6_block = old_ipv6_block
else:
w_ipv6_block = ipv6_hosts_content
write_host_file(ipv6_hosts_content, 'ipv6')
else:
print("ipv6_hosts_content is null")
w_ipv6_block = old_ipv6_block
with open(template_path, "r", encoding='utf-8') as temp_fb:
template_str = temp_fb.read()
hosts_content = template_str.format(ipv4_hosts_str=w_ipv4_block, ipv6_hosts_str=w_ipv6_block, update_time=update_time)
with open(output_doc_file_path, "w", encoding='utf-8') as output_fb:
output_fb.write(hosts_content)
return True
return False
def validate_ip(ip):
"""
验证IP是否为合法的IPv4或IPv6地址
:param ip: 待验证的IP字符串
:return: True(合法)/False(非法)
"""
# IPv4正则(严格验证:每个段0-255,无前置零(除0.0.0.0等合法场景))
ipv4_pattern = r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
# IPv6正则(兼容压缩格式、本地链路地址、IPv4映射地址等所有合法格式)
ipv6_pattern = r'^(?:(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9]))$'
# 忽略大小写验证IPv6,优先验证IPv4
if re.match(ipv4_pattern, ip):
return True
elif re.match(ipv6_pattern, ip, re.IGNORECASE):
return True
else:
return False
def write_host_file(hosts_content: str, filename: str) -> None:
output_file_path = os.path.join(os.path.dirname(__file__), "Tmdb_host_" + filename)
if len(sys.argv) >= 2 and sys.argv[1].upper() == '-G':
print("\n~追加Github ip~")
hosts_content = hosts_content + "\n" + (get_github_hosts() or "")
with open(output_file_path, "w", encoding='utf-8') as output_fb:
output_fb.write(hosts_content)
print("\n~最新TMDB" + filename + "地址已更新~")
def get_github_hosts() -> None:
github_hosts_urls = [
"https://hosts.gitcdn.top/hosts.txt",
"https://raw.githubusercontent.com/521xueweihan/GitHub520/refs/heads/main/hosts",
"https://gitlab.com/ineo6/hosts/-/raw/master/next-hosts",
"https://raw.githubusercontent.com/ittuann/GitHub-IP-hosts/refs/heads/main/hosts_single"
]
all_failed = True
for url in github_hosts_urls:
try:
response = requests.get(url)
if response.status_code == 200:
github_hosts = response.text
all_failed = False
break
else:
print(f"\n从 {url} 获取GitHub hosts失败: HTTP {response.status_code}")
except Exception as e:
print(f"\n从 {url} 获取GitHub hosts时发生错误: {str(e)}")
if all_failed:
print("\n获取GitHub hosts失败: 所有Url项目失败!")
return
else:
return github_hosts
def is_ci_environment():
ci_environment_vars = {
'GITHUB_ACTIONS': 'true',
'TRAVIS': 'true',
'CIRCLECI': 'true'
}
for env_var, expected_value in ci_environment_vars.items():
env_value = os.getenv(env_var)
if env_value is not None and str(env_value).lower() == expected_value.lower():
return True
return False
@retry(tries=3)
def get_domain_ips(domain, record_type):
"""
从Google DNS获取域名的A/AAAA记录IP列表
:param domain: 目标域名(如 tmdb.org)
:param record_type: 记录类型(A/AAAA,或数字1/28)
:return: 去重后的IP列表
"""
all_ips = [] # 存储所有DNS服务器返回的IP
print(f"正在从Google DNS获取 {domain} 的{record_type}记录...")
url = f'https://dns.google/resolve'
headers = {
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"content-type": "application/json; charset=UTF-8", # 关键:指定JSON格式负载
"referer": f"https://dns.google/query?name={domain}&rr_type={record_type}&ecs=",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36 Edg/141.0.0.0"
}
params = {
'name': domain,
'type': record_type
}
# 初始化IP列表(默认空列表,确保后续使用安全)
ips_str = []
try:
# 改用GET请求(Google DNS resolve接口标准用法)
response = requests.get(url, headers=headers, params=params, timeout=10)
response.raise_for_status() # 主动抛出HTTP错误(如4xx/5xx)
# 解析JSON响应
data = response.json()
if not isinstance(data, dict):
print("返回数据不是字典格式,无法解析")
return all_ips
# 核心:提取Answer数组中的data字段(IP地址)
answer_list = data.get("Answer", [])
if not answer_list:
print(f"未找到 {domain} 的{record_type}记录(Answer字段为空)")
return all_ips
# 遍历Answer数组,提取每个条目的data值(IP)
for answer in answer_list:
ip = answer.get("data")
if not ip: # 过滤空值
continue
# 验证IP格式合法性
if validate_ip(ip):
all_ips.append(ip)
print(f"提取到合法IP:{ip}")
else:
print(f"跳过非法IP格式:{ip}")
except requests.exceptions.RequestException as e:
# 捕获所有网络/请求异常
print(f"请求Google DNS失败:{e}")
if hasattr(e, 'response') and e.response:
print(f"响应内容:{e.response.text[:500]}") # 打印前500字符避免过长
except ValueError:
# JSON解析失败
print(f"响应内容不是有效的JSON格式:{response.text[:500]}")
time.sleep(1)
# 去重并返回(保持列表格式)
unique_ips = list(set(all_ips))
print(f"最终提取到 {domain} 的{record_type}记录IP(去重后):{unique_ips}")
return unique_ips
def ping_ip(ip, port=80):
print(f"使用TCP连接测试IP地址的延迟(毫秒)")
try:
print(f"\n开始 ping {ip}...")
start_time = time.time()
with socket.create_connection((ip, port), timeout=2) as sock:
latency = (time.time() - start_time) * 1000 # 转换为毫秒
print(f"IP: {ip} 的平均延迟: {latency}ms")
return latency
except Exception as e:
print(f"Ping {ip} 时发生错误: {str(e)}")
return float('inf')
def find_fastest_ip(ips):
"""找出延迟最低的IP地址"""
if not ips:
return None
fastest_ip = None
min_latency = float('inf')
ip_latencies = [] # 存储所有IP及其延迟
for ip in ips:
ip = ip.strip()
if not ip:
continue
print(f"正在测试 IP: {ip}")
latency = ping_ip(ip)
ip_latencies.append((ip, latency))
print(f"IP: {ip} 延迟: {latency}ms")
if latency < min_latency:
min_latency = latency
fastest_ip = ip
sleep(0.5)
print("\n所有IP延迟情况:")
for ip, latency in ip_latencies:
print(f"IP: {ip} - 延迟: {latency}ms")
if fastest_ip:
print(f"\n最快的IP是: {fastest_ip},延迟: {min_latency}ms")
return fastest_ip
def main():
print("开始检测TMDB相关域名的最快IP...")
ipv4_ips, ipv6_ips, ipv4_results, ipv6_results = [], [], [], []
for domain in DOMAINS:
print(f"\n正在处理域名: {domain}")
ipv4_ips = get_domain_ips(domain, "A")
ipv6_ips = get_domain_ips(domain, "AAAA")
if not ipv4_ips and not ipv6_ips:
print(f"无法获取 {domain} 的IP列表,跳过该域名")
continue
# 处理 IPv4 地址
if ipv4_ips:
fastest_ipv4 = find_fastest_ip(ipv4_ips)
if fastest_ipv4:
ipv4_results.append([fastest_ipv4, domain])
print(f"域名 {domain} 的最快IPv4是: {fastest_ipv4}")
else:
ipv4_results.append([ipv4_ips[0], domain])
# 处理 IPv6 地址
if ipv6_ips:
fastest_ipv6 = find_fastest_ip(ipv6_ips)
if fastest_ipv6:
ipv6_results.append([fastest_ipv6, domain])
print(f"域名 {domain} 的最快IPv6是: {fastest_ipv6}")
else:
# 兜底:可能存在无法正确获取 fastest_ipv6 的情况,则将第一个IP赋值
ipv6_results.append([ipv6_ips[0], domain])
sleep(1) # 避免请求过于频繁
# 保存结果到文件
if not ipv4_results and not ipv6_results:
print(f"程序出错:未获取任何domain及对应IP,请检查接口~")
sys.exit(1)
# 生成更新时间
update_time = datetime.now(timezone(timedelta(hours=8))).replace(microsecond=0).isoformat()
ipv4_hosts_content = Tmdb_Host_TEMPLATE.format(content="\n".join(f"{ip:<27} {domain}" for ip, domain in ipv4_results), update_time=update_time) if ipv4_results else ""
ipv6_hosts_content = Tmdb_Host_TEMPLATE.format(content="\n".join(f"{ip:<50} {domain}" for ip, domain in ipv6_results), update_time=update_time) if ipv6_results else ""
write_file(ipv4_hosts_content, ipv6_hosts_content, update_time)
if __name__ == "__main__":
main()