-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtasks_pattern.cc~
More file actions
172 lines (139 loc) · 3.95 KB
/
tasks_pattern.cc~
File metadata and controls
172 lines (139 loc) · 3.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/**
* Prototype for simple distributed combinatorial search.
*
* Master node distributes work quanta asynchronously to workers.
*
* 1) In this pattern each node gets a "task" to compute, and
* returns bestmatch on finishing the task.
*
* 2) Master node distributes the tasks, and keeps track of best match.
*
* 3) Once the optimalmatch is found, master sends all nodes "exit" signal.
*
*/
#include <cstdio>
#include <cstdlib>
#include "stoper.h" //Timing classes.
#include "STL_MACRO.h" //Common macros for shorter c++ code.
#include "MPI_macros.h" //Our common conventions and standard tasks.
int shutdown;
/**
* - rething role of broadcasts: what is blocking, and what not
* - find way to tell workers to shutdown
* - find way for master to wait until some response received
*/
/////////////////////////////////////////////////
class State {
public:
int kTid;
int kDim;
int kBufferSize;
int shutdownflag_;
int *com_buffer_;
MPI_Status status_;
//buffer -- raw storage for information exchange
State(int tid, int dim, int buff_size) : kTid(tid), kDim(dim),
kBufferSize(buff_size) {
shutdownflag_ = 0;
com_buffer_ = new int[buff_size];
}
virtual ~State() {
delete[] com_buffer_;
}
virtual void Init() {}
virtual void Shutdown() {}
virtual void WorkLoop() {}
virtual void BcastInt(int *k) {
MPI_Bcast(&k, 1, MPI_INT, MASTER, WORLD);
}
};
/////////////////////////////////////////////////
class NodeState : public State {
public:
NodeState(int tid, int dim, int buff_size) : State(tid,dim,buff_size) {}
virtual void Init() {
printf("Worker node %i starting\n", kTid);
}
void doWork() {
MPI_Recv(com_buffer_, kBufferSize, MPI_INT, MASTER, TAG, WORLD, &status_);
printf("Node %i doing work->%i; sdflag=%i\n", kTid, com_buffer_[0],
shutdown);
com_buffer_[0] = kTid + 1000;
MPI_Send(com_buffer_, kBufferSize, MPI_INT, MASTER, TAG, WORLD);
}
void WorkLoop() {
while(!shutdown) {
doWork();
BcastInt(&shutdown);
}
}
virtual void Shutdown() {
BcastInt(&shutdown);
}
};
/////////////////////////////////////////////////
class MasterState : public State {
MPI_Request *sendrequests_; //must be fixed containers (addresses needed)
int startup;
public:
//must register the nodes
//must have communication (MPI_Request's) for nodes.
MasterState(int ktid, int kdim, int buff_size) :
State(ktid, kdim, buff_size) {
sendrequests_ = new MPI_Request[kdim]; //only "0" is master
startup = 1;
}
~MasterState() {
delete sendrequests_;
}
virtual void Shutdown() {
shutdown = 1;
BcastInt(&shutdown);
}
void AssignWork() {
//initial assign?
for(int worker = 1; worker < kDim; ++worker) {
//printf("testing response from node %i ...\n", worker);
if (!startup) {
int flag = 0;
MPI_Test(&sendrequests_[worker], &flag, &status_);
if (flag) { //send new tasks, if previous finished
printf("Master received from node %i: %i\n", worker, com_buffer_[0]);
}
}
com_buffer_[0] = worker;
MPI_Send(com_buffer_, kBufferSize, MPI_INT, worker, TAG, WORLD);
MPI_Irecv(com_buffer_, kBufferSize, MPI_INT, worker, TAG, WORLD,
&sendrequests_[worker]);
}
startup = 0;
}
void WorkLoop() {
REP(i, 20) {
AssignWork();
//wait for any?
//do not destruct MPI_request...
}
}
};
/////////////////////////////////////////////////
int main (int argc, char *argv[])
{
MY_MPI_INIT;
int kBufferSize = 3;
shutdown = 0;
State *state;
WORKERS_DO {
state = new NodeState(kTid, kDim, kBufferSize);
}
MASTER_DO {
state = new MasterState(kTid, kDim, kBufferSize);
}
state->Init();
MPI_Barrier(WORLD);
state->WorkLoop();
state->Shutdown(); //do they all need to participate, or only MASTER?
printf("Node %i exiting; sdflag=%i\n",kTid,shutdown);
MPI_Barrier(WORLD);
MPI_Finalize();
}