-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrfmpi.cpp
More file actions
130 lines (115 loc) · 4.01 KB
/
rfmpi.cpp
File metadata and controls
130 lines (115 loc) · 4.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#include "rfmpi.h"
void execute_main(const int process_count, const double sample_probability, vvd& trainingData, vvd& testingData, unsigned int numTrees) {
unsigned int child_process_count = process_count - 1;
cout << "Master: Child Process: " << child_process_count << ", Number of Trees:"<< numTrees * child_process_count << ", Sample Probability: " << sample_probability << "%" << endl;
int sample_probability_int = static_cast<int>(RAND_MAX * sample_probability);
// Buffers.
unsigned int col_count = trainingData[0].size();
double row_buffer[col_count];
cout << "Master: Sending training data to children..." << endl;
for (size_t i = 0; i < trainingData.size(); i++) {
copy(trainingData[i].begin(), trainingData[i].end(), row_buffer);
// Send to child.
for ( unsigned int child_rank = 0; child_rank < child_process_count; ++child_rank ) {
if ( rand() < sample_probability_int ) {
MPI_Send(
&row_buffer,
col_count,
MPI_DOUBLE,
child_rank,
MessageTag::RowBuffer,
MPI_COMM_WORLD);
}
}
}
// Stop the loading.
// Sending stop signal for stoping sending training data to children
row_buffer[0] = numeric_limits<double>::quiet_NaN();
for (unsigned int child_rank = 0; child_rank < child_process_count; ++child_rank ) {
// Send to child.
MPI_Send(
&row_buffer,
col_count,
MPI_DOUBLE,
child_rank,
MessageTag::RowBuffer,
MPI_COMM_WORLD );
}
cout << "Master: Sending finished." << endl;
// Grab the trees.
MPI_Status status;
vvi allPredictedClassLabels; // Store all the predicted class label result of the randomforest
vi predictedClassLabels;
unsigned int row_count = testingData.size();
unsigned int buffer_size = row_count * numTrees;
int label_buffer[buffer_size];
cout << "Master: Waiting for result of predicted class labels from children." << endl;
for ( unsigned int child_rank = 0; child_rank < child_process_count; ++child_rank ) {
cout << "Master: Waiting on slave " << child_rank << "..." << endl;
// Wait on child.
MPI_Recv(
&label_buffer,
buffer_size,
MPI_INT,
child_rank,
MessageTag::TreeFinished,
MPI_COMM_WORLD,
&status );
for (size_t i = 0; i < numTrees; i++) {
predictedClassLabels.assign(label_buffer + i * row_count, label_buffer + (i + 1) * row_count);
allPredictedClassLabels.push_back(predictedClassLabels);
predictedClassLabels.clear();
}
}
cout << "Master: Loaded all forests ! Classifying..." << endl;
// Load testing data.
allRandomForest(allPredictedClassLabels, testingData);
cout << "Master: Finished!\n" << endl;
}
void execute_child(const unsigned int parent_rank, const unsigned int rank, vvd& testingData, unsigned int numTrees) {
cout << testingData.size() << endl;
unsigned int col_count = testingData[0].size();
MPI_Status status;
//cout << "Slave [" << rank << "] online: [BD: " << bootstrap_divisor << ", SK: " << split_keys_per_node << ", TPF: " << trees_per_forest << "]" << endl;
cout << "Slave [" << rank << "]: Receiving the trainingData" << endl;
double buffer[col_count];
vd data;
buffer[0] = 0.0;
vvd trainingData;
while ( true ) {
MPI_Recv(
&buffer,
col_count,
MPI_DOUBLE,
parent_rank,
MessageTag::RowBuffer,
MPI_COMM_WORLD,
&status );
// Stop on NaN.
if ( buffer[0] != buffer[0] ) {
break;
}
// Add elements to data vector.
data.assign(buffer, buffer + col_count);
trainingData.push_back(data);
data.clear();
}
cout << "Slave [" << rank << "]: Growing tree..." << endl;
vvi predictedClassLabels;
randomForest(numTrees, trainingData, testingData, predictedClassLabels);
unsigned int row_count = predictedClassLabels[0].size();
unsigned int buffer_size = numTrees * row_count;
int label_buffer[buffer_size];
for (size_t i = 0; i < numTrees; i++) {
copy(predictedClassLabels[i].begin(), predictedClassLabels[i].end(), label_buffer + i * row_count);
}
// Send finished signal.
MPI_Send(
&label_buffer,
buffer_size,
MPI_INT,
parent_rank,
MessageTag::TreeFinished,
MPI_COMM_WORLD);
cout << "Slave [" << rank << "]: Finished!" << endl;
}