-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcoordinator.cc
More file actions
executable file
·163 lines (149 loc) · 6.5 KB
/
coordinator.cc
File metadata and controls
executable file
·163 lines (149 loc) · 6.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#include <thread>
#include <cstring>
#include <cerrno>
#include <memory>
#include <chrono>
#include <random>
#include <cmath>
#include <fstream>
#include "coordinator.h"
struct MaxHeap {
bool operator()(const std::pair<Scheduler*, int>& lhs, const std::pair<Scheduler*, int>& rhs) const {
return lhs.second < rhs.second; // Compare second elements in descending order
}
};
struct MinHeap {
bool operator()(const std::pair<Scheduler*, int>& lhs, const std::pair<Scheduler*, int>& rhs) const {
return lhs.second > rhs.second; // Compare second elements in ascending order
}
};
Coordinator::Coordinator() {
int max_threads = std::thread::hardware_concurrency() / 2;
logger.info("Creating %d schedulers", max_threads);
for (int i = 0; i < max_threads; ++i) {
this->schedulers.push_back(new Scheduler(i, max_threads));
}
}
Coordinator::~Coordinator() {
while (schedulers.size() > 0) {
Scheduler* scheduler = schedulers.back();
schedulers.pop_back();
delete scheduler;
}
}
int Coordinator::stealTasks(Scheduler* recipient_scheduler) {
int total_tasks = 0;
int doner_current_task_count = 0;
Scheduler* doner_scheduler;
for (auto &scheduler : schedulers) {
int current_task_count = scheduler->current_task_count;
if (scheduler->id == recipient_scheduler->id) continue;
total_tasks = total_tasks + current_task_count;
if (doner_current_task_count < current_task_count) {
doner_current_task_count = current_task_count;
doner_scheduler = scheduler;
}
}
int average_tasks_ceil = std::ceil((total_tasks * 1.0) / schedulers.size());
int max_acceptable_donations = average_tasks_ceil;
int curr_donated_tasks = std::min(max_acceptable_donations, doner_current_task_count - average_tasks_ceil);
curr_donated_tasks = std::min(curr_donated_tasks, MAX_TASKS_TO_STEAL);
if (curr_donated_tasks > 0) {
doner_scheduler->submitToSubmissionQueue(curr_donated_tasks, recipient_scheduler);
logger.trace("Self balancing total_tasks: %d, average_tasks_ceil: %d curr_donated_tasks: %d", total_tasks, average_tasks_ceil, curr_donated_tasks);
}
return curr_donated_tasks;
}
void Coordinator::balanceLoad() {
int total_tasks = 0;
for (auto &scheduler : schedulers) {
int current_task_count = scheduler->current_task_count;
total_tasks += current_task_count;
}
int average_tasks_ceil = std::ceil((total_tasks * 1.0) / schedulers.size());
std::priority_queue<std::pair<Scheduler*, int>, std::vector<std::pair<Scheduler*, int>>, MaxHeap> doner_schedulers;
std::priority_queue<std::pair<Scheduler*, int>, std::vector<std::pair<Scheduler*, int>>, MinHeap> recepient_schedulers;
for (auto &scheduler : schedulers) {
int current_task_count = scheduler->current_task_count;
if (current_task_count > average_tasks_ceil) {
doner_schedulers.push({scheduler, current_task_count});
} else if (current_task_count < average_tasks_ceil) {
recepient_schedulers.push({scheduler, current_task_count});
}
}
logger.trace("Periodic balancing total_tasks: %d, average_tasks_ceil: %d, doner_schedulers_size: %d", total_tasks, average_tasks_ceil, doner_schedulers.size());
int total_donated_tasks = 0;
while (!recepient_schedulers.empty()) {
Scheduler* scheduler = recepient_schedulers.top().first;
int current_task_count = recepient_schedulers.top().second;
recepient_schedulers.pop();
int max_acceptable_donations = std::min(MAX_TASKS_TO_STEAL, average_tasks_ceil - current_task_count);
while (!doner_schedulers.empty() && max_acceptable_donations > 0) {
Scheduler *doner_scheduler = doner_schedulers.top().first;
int doner_current_task_count = doner_schedulers.top().second;
if (doner_current_task_count < average_tasks_ceil || total_donated_tasks == MAX_TASKS_TO_STEAL) {
doner_schedulers.pop();
total_donated_tasks = 0;
continue;
}
int curr_donated_tasks = std::min(max_acceptable_donations, doner_current_task_count - average_tasks_ceil);
curr_donated_tasks = std::min(curr_donated_tasks, MAX_TASKS_TO_STEAL);
if (curr_donated_tasks > 0) {
max_acceptable_donations = max_acceptable_donations - curr_donated_tasks;
doner_scheduler->submitToOwnerSubmissionQueue(curr_donated_tasks, scheduler);
doner_schedulers.pop();
doner_schedulers.push({doner_scheduler, doner_current_task_count - curr_donated_tasks});
total_donated_tasks += curr_donated_tasks;
}
}
}
}
void Coordinator::submit(Task* task) {
schedulers[next_scheduler_id]->submit(task);
#ifndef ENABLE_LOAD_BALANCE_METRICS
next_scheduler_id = (next_scheduler_id + 1) % schedulers.size();
#endif
}
void Coordinator::start() {
std::vector<std::thread> threads;
for (auto &scheduler : schedulers) {
scheduler->setCoordinator(this);
threads.emplace_back([&scheduler] {
try {
logger.info("Started\n");
scheduler->start();
} catch (const std::runtime_error& e) {
logger.error("Scheduler %d caught runtime error: %s", scheduler->id, e.what());
} catch (const std::exception& e) {
logger.error("Scheduler %d caught exception: %s", scheduler->id, e.what());
} catch (...) {
logger.error("Scheduler %d caught unknown exception", scheduler->id);
}
});
}
threads.emplace_back([this] {
try {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distribution(500, 1500);
while(true) {
std::this_thread::sleep_for(std::chrono::milliseconds(distribution(gen)));
balanceLoad();
}
} catch (const std::runtime_error& e) {
logger.error("Load balancing deamon caught runtime error: %s", e.what());
} catch (const std::exception& e) {
logger.error("Load balancing deamon caught exception: %s", e.what());
} catch (...) {
logger.error("Load balancing deamon caught unknown exception");
}
});
for (std::thread& thread : threads) {
thread.join();
}
}
void Coordinator::stop() {
for (auto &scheduler : schedulers) {
scheduler->stop();
}
}