Skip to content

Commit 73aaea8

Browse files
committed
Add more multi-threading to uptime query. Add batch processing of UPDATE. Processing update query to remote database or local depending on user.
1 parent 3afb9d2 commit 73aaea8

2 files changed

Lines changed: 72 additions & 112 deletions

File tree

meile_node_uptime.py

Lines changed: 71 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import concurrent.futures
1212
from dbutils.pooled_db import PooledDB
1313

14-
VERSION = 20250227.1502
15-
WORKERS = 30
14+
VERSION = 20250304.1738
15+
WORKERS = 50 # edit. This should be plenty.
1616
APIURL = 'https://api.sentinel.mathnodes.com'
1717

1818
class UpdateNodeUptime():
@@ -21,7 +21,17 @@ def connDB(self):
2121

2222
self.db_pool = PooledDB(
2323
creator=pymysql,
24-
maxconnections=30,
24+
maxconnections=50,
25+
host=scrtsxx.RHOST,
26+
port=scrtsxx.PORT,
27+
user=scrtsxx.USERNAME,
28+
passwd=scrtsxx.PASSWORD,
29+
database=scrtsxx.DB,
30+
)
31+
32+
self.local_db_pool = PooledDB(
33+
creator=pymysql,
34+
maxconnections=50,
2535
host=scrtsxx.HOST,
2636
port=scrtsxx.PORT,
2737
user=scrtsxx.USERNAME,
@@ -52,28 +62,20 @@ def get_node_uptime_table(self,db):
5262
def get_remote_url_of_node(self, db, NodeData):
5363
NodeRemoteURL = {'address' : [], 'url' : []}
5464

55-
# Retrieve nodes with empty remote_url from the table
56-
#query = "SELECT * FROM node_uptime WHERE remote_url = '';"
57-
c = db.cursor()
58-
#c.execute(query)
59-
#nodes_without_remote_url = c.fetchall()
60-
#print(nodes_without_remote_url)
61-
for n in NodeData:
65+
def get_remote_url(n):
6266
address = n['node_address']
63-
64-
# Check if the node already has a remote_url in the table
65-
#if any(node['node_address'] == address for node in nodes_without_remote_url):
66-
# continue
67-
68-
# Retrieve remote_url from the table for nodes that have it stored
67+
#print(address)
6968
query = f"SELECT remote_url FROM node_uptime WHERE node_address = '{address}';"
69+
connection = self.local_db_pool.connection()
70+
#c = db.cursor()
71+
c = connection.cursor()
7072
c.execute(query)
7173
result = c.fetchone()
72-
#print(result['remote_url'])
73-
if not result['remote_url']:
74-
74+
#print(result)
75+
connection.close()
76+
if not result[0]:
7577
endpoint = APIURL + '/sentinel/nodes/' + address
76-
remote_url = result['remote_url']
78+
remote_url = ''
7779
print(f"Getting remote_url of: {address}")
7880
sys.stdout.flush()
7981

@@ -82,59 +84,21 @@ def get_remote_url_of_node(self, db, NodeData):
8284
remote_url = r.json()['node']['remote_url']
8385
except Exception as e:
8486
print(str(e))
85-
continue
86-
#print(f"{remote_url}")
87+
return n['node_address'], None
8788
else:
88-
remote_url = result['remote_url']
89-
90-
NodeRemoteURL['address'].append(n['node_address'])
91-
NodeRemoteURL['url'].append(remote_url)
92-
93-
#print(NodeRemoteURL)
94-
95-
return NodeRemoteURL
96-
97-
'''
98-
def check_uptime(self, NodeRemoteURLs):
99-
k=0
89+
remote_url = result[0]
90+
91+
return n['node_address'], remote_url
10092

101-
NodeUptimeBoolean = {'address' : [], 'up' : []}
93+
with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor:
94+
futures = [executor.submit(get_remote_url, n) for n in NodeData]
95+
results = [future.result() for future in futures]
10296

97+
for address, remote_url in results:
98+
NodeRemoteURL['address'].append(address)
99+
NodeRemoteURL['url'].append(remote_url)
103100

104-
for n in NodeRemoteURLs['address']:
105-
url = NodeRemoteURLs['url'][k]
106-
parsed_url = urlparse(url)
107-
netloc = parsed_url.netloc
108-
#host, port = urlparse(url).netloc.split(":")
109-
#print(f"host: {host}, port: {port}")
110-
#print("Checking if up: ", end='')
111-
112-
if '[' in netloc and ']' in netloc:
113-
host = netloc.split(']', 1)[0][1:]
114-
115-
if ':' in netloc.split(']', 1)[1]:
116-
port = int(netloc.split(']', 1)[1].split(':', 1)[1])
117-
else:
118-
port = None
119-
else:
120-
parts = netloc.split(':')
121-
if len(parts) > 2:
122-
raise ValueError("Invalid URL format")
123-
host = parts[0]
124-
if len(parts) == 2:
125-
port = int(parts[1])
126-
else:
127-
port = None
128-
129-
130-
up = self.check_socket(host, int(port))
131-
#print(up)
132-
NodeUptimeBoolean['address'].append(n)
133-
NodeUptimeBoolean['up'].append(up)
134-
k += 1
135-
136-
return NodeUptimeBoolean
137-
'''
101+
return NodeRemoteURL
138102

139103
def check_uptime(self, NodeRemoteURLs):
140104
k = 0
@@ -162,10 +126,10 @@ def check_uptime_for_node(n, url):
162126

163127
up = self.check_socket(host, int(port) if port else None)
164128
result = (n, up)
165-
print(result)
129+
#print(result)
166130
return result
167131

168-
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
132+
with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor:
169133
futures = []
170134

171135
for n in NodeRemoteURLs['address']:
@@ -194,48 +158,43 @@ def check_socket(self, host, port):
194158
return False
195159

196160

197-
def update_node_uptime(self, node, NodeUptimeData, NodeRemoteURLs, NodeUptimeBoolean):
198-
try:
199-
index = NodeUptimeBoolean['address'].index(node['node_address'])
200-
rindex = NodeRemoteURLs['address'].index(node['node_address'])
201-
remote_url = NodeRemoteURLs['url'][rindex]
202-
except Exception as e:
203-
print(node)
204-
print(str(e))
205-
return
206-
207-
tries = node['tries'] + 1
208-
if NodeUptimeBoolean['up'][index]:
209-
success = node['success'] + 1
210-
else:
211-
success = node['success']
212-
success_rate = round(float(success/tries),3)
213-
214-
query = 'UPDATE node_uptime SET remote_url = "%s", tries = %d, success = %d, success_rate = "%.3f" WHERE node_address = "%s";' % (remote_url,
215-
tries,
216-
success,
217-
success_rate,
218-
node['node_address'])
161+
def UpdateNodeUptimeTable(self, db, NodeUptimeData, NodeRemoteURLs, NodeUptimeBoolean):
219162

220-
#print(query)
221-
#c = db.cursor()
222-
#c.execute(query)
223-
#db.commit()
224-
with self.lock:
225-
connection = self.db_pool.connection()
226-
cursor = connection.cursor()
227-
cursor.execute(query)
228-
connection.commit()
229-
connection.close()
230-
231-
def UpdateNodeUptimeTable(self, NodeUptimeData, NodeRemoteURLs, NodeUptimeBoolean):
232-
with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor:
233-
futures = []
234-
for node in NodeUptimeData:
235-
futures.append(executor.submit(self.update_node_uptime, node, NodeUptimeData, NodeRemoteURLs, NodeUptimeBoolean))
236-
for future in concurrent.futures.as_completed(futures):
237-
future.result()
163+
query = 'UPDATE node_uptime SET remote_url = "%s", tries = %s, success = %s, success_rate = "%s" WHERE node_address = "%s";'
164+
update_data = []
165+
for node in NodeUptimeData:
166+
try:
167+
index = NodeUptimeBoolean['address'].index(node['node_address'])
168+
rindex = NodeRemoteURLs['address'].index(node['node_address'])
169+
remote_url = NodeRemoteURLs['url'][rindex]
170+
except Exception as e:
171+
print(node)
172+
print(str(e))
173+
return
238174

175+
tries = node['tries'] + 1
176+
if NodeUptimeBoolean['up'][index]:
177+
success = node['success'] + 1
178+
else:
179+
success = node['success']
180+
success_rate = round(float(success/tries),3)
181+
182+
update_data.append((remote_url,
183+
tries,
184+
success,
185+
success_rate,
186+
node['node_address']))
187+
188+
if len(update_data) == 200:
189+
# Use if not running from a remote server and comment the line after the next one.
190+
#connection = self.local_db_pool.connection()
191+
connection = self.db_pool.connection()
192+
cursor = connection.cursor()
193+
cursor.executemany(query, update_data)
194+
connection.commit()
195+
connection.close()
196+
update_data.clear()
197+
239198
if __name__ == "__main__":
240199
NodeUptimeClass = UpdateNodeUptime()
241200
db = NodeUptimeClass.connDB()
@@ -262,7 +221,7 @@ def UpdateNodeUptimeTable(self, NodeUptimeData, NodeRemoteURLs, NodeUptimeBoolea
262221
print("It took %ss to check if all nodes are connectable" % (time3))
263222

264223
start = timer()
265-
NodeUptimeClass.UpdateNodeUptimeTable(NodeUptimeData, NodeRemoteURLs, NodeUptimeBoolean)
224+
NodeUptimeClass.UpdateNodeUptimeTable(db, NodeUptimeData, NodeRemoteURLs, NodeUptimeBoolean)
266225
end = timer()
267226

268227
time4 = round((end - start),4)

scrtsxx.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
USERNAME = "username"
33
PASSWORD = "password"
44
HOST = "localhost"
5+
RHOST = "foo.bar" # edit if using remote query
56
PORT = 3306

0 commit comments

Comments
 (0)