forked from rennancl/C04
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun.py
More file actions
170 lines (127 loc) · 4.47 KB
/
run.py
File metadata and controls
170 lines (127 loc) · 4.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import multiprocessing
import os
import shutil
import signal
import socket
import subprocess
import sys
import time
from crawlers.crawler_manager import file_descriptor_process
import crawling_utils.crawling_utils as crawling_utils
# GLOBAL VARIABLES
stop_processes = False
process_exception = None
n_processes_running = 0
global_lock = multiprocessing.Lock()
# END GLOBAL VARIABLES
# FUNCTIONS THAT SHOULD BE EXECUTED BY PROCESSES
N_FUNCTIONS = 5
def wait_for_port(port):
# Wait until the process is running in a port
a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
location = ("127.0.0.1", port)
while a_socket.connect_ex(location):
time.sleep(1)
a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
a_socket.close()
def run_django():
signal.signal(signal.SIGCHLD, lambda _, __: os.wait())
wait_for_port(9092)
# gets port from CLI parameter and sets module to be used on spider_closed()
if (len(sys.argv[1:]) == 0):
cli_port = 8000
else:
cli_port = str(sys.argv[1]).split(":",1)[1]
f = open("./crawlers/port.py", "w")
f.write("PORT="+str(cli_port))
f.close()
# Runs django repassing cli parameters
subprocess.run(["python", "manage.py", "runserver"] + sys.argv[1:])
def run_zookeeper():
crawling_utils.check_file_path("crawlers/log/")
os.chdir('kafka_2.13-2.4.0')
# Starts zookeeper server with overriten properties
subprocess.run(['bin/zookeeper-server-start.sh',
'config/zoo.properties'],
stdout=open(f"../crawlers/log/zookeeper.out", "a", buffering=1),
stderr=open(f"../crawlers/log/zookeeper.err", "a", buffering=1))
def run_kafka():
wait_for_port(2181)
crawling_utils.check_file_path("crawlers/log/")
os.chdir('kafka_2.13-2.4.0')
# Starts kafka server
subprocess.run(['bin/kafka-server-start.sh',
'config/server.properties',
'--override',
'log.dirs=kafka-logs'],
stdout=open(f"../crawlers/log/kafka.out", "a", buffering=1),
stderr=open(f"../crawlers/log/kafka.err", "a", buffering=1))
def runn_file_descriptor():
file_descriptor_process()
# END FUNCTIONS THAT SHOULD BE EXECUTED BY PROCESSES
def init_process():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def signal_done(r):
global global_logk, n_processes_running
with global_lock:
n_processes_running -= 1
def signal_stop(e):
global stop_processes, process_exception
stop_processes = True
process_exception = e
def run():
global stop_processes, process_exception, n_processes_running, global_lock
print("Initializing processes")
pool = multiprocessing.Pool(
processes=N_FUNCTIONS,
initializer=init_process,
)
# List functions that will be executed by processes
functions = [
[run_zookeeper, []],
[run_kafka, []],
[run_django, []],
[runn_file_descriptor, []],
# [run_file_downloader, []],
]
# N_FUNCTIONS must be equal to len(functions)
with global_lock:
n_processes_running = N_FUNCTIONS
for (f, args) in functions:
pool.apply_async(
func=f,
args=args,
callback=signal_done,
error_callback=signal_stop,
)
# end list function
try:
while True:
with global_lock:
# stop processes if any of them stopped running
if n_processes_running != N_FUNCTIONS:
print("A process ended, terminating processes...")
break
time.sleep(5)
if stop_processes:
# re-raise an excetion raised by one of the processes
raise process_exception
except KeyboardInterrupt:
# stop process on KeyboardInterrupt
print("KeyboardInterrupt, terminating processes...")
except Exception as e:
# re-raises process exception and stop processes, printing traceback
print("A process failed, terminating processes...")
raise e
finally:
# stop kafka and zookeeper:
os.chdir('kafka_2.13-2.4.0')
subprocess.run(['bin/kafka-server-stop.sh'])
subprocess.run(['bin/zookeeper-server-stop.sh'])
shutil.rmtree('zookeeper')
shutil.rmtree('kafka-logs')
# stop processes
pool.terminate()
pool.join()
if __name__ == "__main__":
run()