-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathserver.py
More file actions
executable file
·137 lines (115 loc) · 4.47 KB
/
server.py
File metadata and controls
executable file
·137 lines (115 loc) · 4.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/python3
"""
Name: cuddlyclara
Website: cuddlyclara.de
Source: https://github.com/cuddlyclara/SimpleDoHServer
Description: Very simple DoH server based on Python 3, which passes the client IP via ECS.
"""
import json
import base64
import logging
import ipaddress
import http.server
import socketserver
import urllib.parse
import dns.message
import dns.edns
import dns.query
def is_valid_ipv4(ip_str):
try:
ipaddress.IPv4Address(ip_str)
return True
except ipaddress.AddressValueError:
return False
def requestDNSAnswer(query, clientip):
# Parse the DNS query message from wire format
request = dns.message.from_wire(query)
# Log DNS query details: type, domain, and client IP if available
if len(request.question) > 0:
question = request.question[0]
logging.info(f'query {dns.rdatatype.to_text(question.rdtype)} {question.name.to_text()} from {clientip}')
# Include EDNS with ECS option from client IP address if client IP address is valid
if is_valid_ipv4(clientip):
ecs = dns.edns.ECSOption.from_text(clientip + '/32')
request.use_edns(edns=True, options=[ecs])
else:
logging.warning(f'client IP {clientip} not valid using server IP for request')
# Send the query
response, fallback_used = dns.query.udp_with_fallback(request, dnsserver, timeout)
# Log a warning if fallback to TCP was necessary
if fallback_used:
logging.warning('fallback to TCP required')
# Return the DNS response
return response.to_wire()
def main():
# Enable address reuse to prevent 'Address already in use' error
socketserver.TCPServer.allow_reuse_address = True
# Create the DoH server
with socketserver.TCPServer((host, port), DohHandler) as httpd:
try:
print(f'Serving DoH on {host}:{port} using DNS server {dnsserver}')
httpd.serve_forever()
except KeyboardInterrupt:
print('Shutting down the server...')
httpd.shutdown()
class DohHandler(http.server.BaseHTTPRequestHandler):
def sendErrorResponse(self, code, e):
error_data = {
'error_code': code,
'error_type': type(e).__name__,
'error_message': str(e)
}
self.send_response(code)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'error': error_data}).encode('utf-8'))
def sendDoHResponse(self, dns_answer):
self.send_response(200)
self.send_header('Content-Type', 'application/dns-message')
self.end_headers()
self.wfile.write(dns_answer)
def do_GET(self):
try:
# Parse the dns query parameter
query = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
dns_query = base64.b64decode(query['dns'][0])
except Exception as e:
# Provides a 'Bad Request' response in case of a parsing error
self.sendErrorResponse(400, e)
raise
try:
dns_answer = requestDNSAnswer(dns_query, self.headers[realipheader])
except Exception as e:
# Provides a 'Internal Server Error' response in case of a DNS resolution error
self.sendErrorResponse(500, e)
raise
# Respond with DoH response
self.sendDoHResponse(dns_answer)
def do_POST(self):
try:
# Parse the input stream
content_length = int(self.headers['Content-Length'])
dns_query = self.rfile.read(content_length)
except Exception as e:
# Provides a 'Bad Request' response in case of a parsing error
self.sendErrorResponse(400, e)
raise
try:
dns_answer = requestDNSAnswer(dns_query, self.headers[realipheader])
except Exception as e:
# Provides a 'Internal Server Error' response in case of a DNS resolution error
self.sendErrorResponse(500, e)
raise
# Respond with DoH response
self.sendDoHResponse(dns_answer)
if __name__ == '__main__':
# Set the LogLevel to logging.WARNING or logging.ERROR to suppress the output of DNS requests
logging.basicConfig(level=logging.INFO)
# Set the server address, port, dns server, dns request timeout (in seconds) and the real ip header
host = '127.0.0.1'
port = 8080
dnsserver = '10.10.10.10'
timeout = 10
realipheader = 'X-Forwarded-For'
# Call the main function
main()