-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathExample_Cpp_Project_Multi_thread_2.cpp
More file actions
364 lines (306 loc) · 14.3 KB
/
Example_Cpp_Project_Multi_thread_2.cpp
File metadata and controls
364 lines (306 loc) · 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
// Alkinoos Sarioglou - Year 3
// ID: 10136315
// Concurrent Systems Assignment
// 18/11/2019
// Pre-processor directives
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <map>
#include <random>
#include <chrono>
#include <vector>
#include <iterator>
using namespace std;
// Global Constants:
int const MAX_NUM_OF_CHAN = 6; //number of AdcInputChannels
int const MAX_NUM_OF_THREADS = 6; //number of threads
int const DATA_BLOCK_SIZE = 20; //size of data array for each thread
int const NUM_OF_LINKS = 3; //number of communication links
int random_delay_between_100_and_500_i = 0; // delay_1 0.1-0.5 s after release of ADC channel
int random_delay_between_100_and_500_ii = 0; // delay_1 0.1-0.5 s after obtaining a Link
// Global Variables:
std::mutex mu;
std::condition_variable cond;
std::condition_variable cond_link;
std::unique_lock<std::mutex> lock1;
std::unique_lock<std::mutex> map_locker;
// Map of threads IDs and int values
std::map<std::thread::id, int> threadIDs;
// Class for Input Channels to ADC
class AdcInputChannel {
private:
int currentSample; // current sample from the channel
public:
AdcInputChannel(int d) : currentSample(d) {} // Constructor (currentSample = d)
// Used to request a sample from the sample channel:
double getCurrentSample() {
return (double)2*currentSample; // Returns 2 times the number of channel
}
}; // End Class AdcInputChannel
// Class for the Lock/Unlock condition of the ADC
class Lock {
private:
bool open; // Whether ADC is available or not
public:
Lock() : open(true) {} // Constructor
// Returns a flag to indicate when a thread should be blocked:
bool isADCLocked() {
if (open == true) { // If ADC is available, DO NOT BLOCK the thread
return false;
}
else if (open == false) { // If ADC is unavailable, BLOCK the thread
return true;
}
}
// Unlock the ADC by setting open to TRUE
void UnlockADC() {
open = true;
}
// Lock the ADC by setting open to FALSE
void LockADC() {
open = false;
}
}; // End class Lock
// Class for the ADC
class ADC {
private:
Lock theADCLock; // Instance for the Lock/Unlock condition
int sampleChannel; // Channel to sample
std::vector<AdcInputChannel>& adcchannels; // Vector of ADC channels
std::mutex ADC_mu; // Mutex for ADC
public:
// Constructor: initialises a vector of ADCInputChannels
ADC(std::vector<AdcInputChannel>& channels) : adcchannels(channels) {}
// Request access to a channel from the ADC
void requestADC(int c) {
std::unique_lock<std::mutex> lock1(ADC_mu); // Unique_lock for mutual exclusion
if (theADCLock.isADCLocked()) { // ADC locked
std::cout << "ADC is locked, thread " << c << " is about to suspend.." << endl;
while (theADCLock.isADCLocked()) { // Suspend the thread requesting until ADC available
cond.wait(lock1);
}
}
theADCLock.LockADC(); // When ADC available, gain access and lock it
sampleChannel = c; // Channel to sample is equal to the thread ID, each thread different channel
std::cout << "ADC locked by thread " << c << endl;
}
// Sample a channel, get the value
double sampleADC(int id) {
std::unique_lock<std::mutex> lock1(ADC_mu); // Unique_lock for mutual exclusion
std::cout << " sample value from thread " << id << " = " << adcchannels[sampleChannel].getCurrentSample() << endl;
return adcchannels[sampleChannel].getCurrentSample(); // Return the sample value
}
// Release the ADC
void releaseADC(int id) {
std::unique_lock<std::mutex> lock1(ADC_mu); // Unique_lock for mutual exclusion
std::cout << " ADC unlocked by thread " << id << endl;
theADCLock.UnlockADC(); // Unlock the ADC, to allow access by other threads
cond.notify_all(); // Notify the other threads that they can access the ADC now
}
}; // End class ADC
// Class for the Receiver
class Receiver {
public:
// Constructor:
Receiver () {
// Initialise dataBlocks 2-D array:
for (int i = 0; i < MAX_NUM_OF_THREADS; i++) { // Rows
for (int j = 0; j < DATA_BLOCK_SIZE; j++) { // Columns
dataBlocks[i][j] = 0; // Initialise to 0
}
}
}
// Receives a block of doubles such that the data
// is stored in index id of dataBlocks[][]
void receiveDataBlock(int id, double data[]) {
for (int a=0; a<DATA_BLOCK_SIZE; a++) {
dataBlocks[id][a] = data[a]; // Complete columns with data from the thread(id) specified/sending the data
}
}
// Print out all items of the 2-D array
void printBlocks() {
for (int i = 0; i < MAX_NUM_OF_THREADS; i++) { // Rows
std::cout << "Thread " << i << " ";
for (int j = 0; j < DATA_BLOCK_SIZE; j++) { // Columns
std::cout << dataBlocks[i][j] << ' ';
}
std::cout << endl; // Add new line for next thread to be displayed
}
}
private:
double dataBlocks[MAX_NUM_OF_THREADS][DATA_BLOCK_SIZE]; // 2-D array for the data
}; // End class Receiver
// Class for a Link
class Link {
public:
// Constructor
Link (Receiver& r, int linkNum) : inUse(false), myReceiver(r), linkId(linkNum) {}
// Check if the link is currently in use
bool isInUse() {
return inUse;
}
// Set the link status to busy
void setInUse() {
inUse = true;
}
// Set the link status to idle
void setIdle() {
inUse = false;
}
// Write data sampled from the thread to the Receiver through the Link
void writeToDataLink(int id, double data[]) {
myReceiver.receiveDataBlock(id,data); // Send data to Receiver
}
// Returns the link Id
int getLinkId() {
return linkId;
}
private:
bool inUse; // Whether the Link is in Use
Receiver& myReceiver; // Receiver reference
int linkId; // Link ID
}; // End class Link
// Class for the LinkAccessController
class LinkAccessController {
public:
// Constructor
LinkAccessController(Receiver& r) : myReceiver(r), numOfAvailableLinks(NUM_OF_LINKS) {
for (int i = 0; i < NUM_OF_LINKS; i++) { // Add individual Links as elements in the vector of Communication Links
commsLinks.push_back(Link(myReceiver, i));
}
}
// Returns true is all Links are in use
bool allLinksInUse () {
if (commsLinks[0].isInUse() && commsLinks[1].isInUse() && commsLinks[2].isInUse()) {
return true;
}
else return false;
}
//Request a Communications Link: returns an available Link.
//If none are available, the calling thread is suspended.
Link requestLink(int k) {
std::unique_lock<std::mutex> lock1(LAC_mu); // Mutual Exclusion
if (allLinksInUse()) {
std::cout << "LINKS are locked, thread " << k << " is about to suspend.." << endl;
while (allLinksInUse()) { // Thread suspended until a Link becomes available
cond.wait(lock1);
}
}
if (!(commsLinks[0].isInUse())) { // If Link 1 is free
linkNum = 0;
commsLinks[0].setInUse(); // Lock the Link
std::cout << "Link " << commsLinks[0].getLinkId() << " locked by thread " << k << endl;
}
else if (!(commsLinks[1].isInUse())) {
linkNum = 1;
commsLinks[1].setInUse(); // Lock the Link
std::cout << "Link " << commsLinks[1].getLinkId() << " locked by thread " << k << endl;
}
else if (!(commsLinks[2].isInUse())) {
linkNum = 2;
commsLinks[2].setInUse(); // Lock the Link
std::cout << "Link " << commsLinks[2].getLinkId() << " locked by thread " << k << endl;
}
return commsLinks[linkNum]; // Return the available Link
}
//Release a Communications Link:
void releaseLink(Link& releasedLink, int id) {
std::unique_lock<std::mutex> lock1(LAC_mu); // Mutual Exclusion
number_to_release = releasedLink.getLinkId(); // Which Link is released
commsLinks[number_to_release].setIdle(); // Unlock Link
std::cout << " Link " << number_to_release << " unlocked by thread " << id << endl;
cond.notify_all(); // Notify all other threads
}
private:
Receiver& myReceiver; // Receiver reference
int numOfAvailableLinks;
int linkNum; // Number to identify a Link
int number_to_release; // Number of Link to be released
std::vector<Link> commsLinks; // Vector of Communication Links
std::mutex LAC_mu; // Mutex for LAC
}; // End class LinkAccessController
// Add a pair of thread_ID and int value on the map
void accessmap (int id){
threadIDs.insert(std::make_pair(std::this_thread::get_id(), id));
}
// Search for a thread_ID on the map and return its int value
int search_id_function () {
std::unique_lock<std::mutex> map_locker(mu); // Mutual Exclusion
std::map <std::thread::id, int>::iterator it = threadIDs.find(std::this_thread::get_id()); // Iterator to search the map
if (it == threadIDs.end()) { // If thread_id is not found
return -1; // Return error
}
else {
return it->second; // If thread_id is found, return its int value
}
map_locker.unlock(); // Unlock the mutex
}
// Run function executed by each thread:
void run(ADC& theADC, LinkAccessController& theLinkController, int id) {
accessmap(id); // Add the ID of the current thread with its int value on the map
int thread_id = search_id_function(); // Return thread's int value
double sample; // Sample value
// Array to store the sampled data from the thread
double sampleBlock[DATA_BLOCK_SIZE] = {0.0}; // Initialise all elements to 0
for (int i=0; i<DATA_BLOCK_SIZE; i++) { // Until the array becomes full of data
// Request use of the ADC, Channel to sample is id
theADC.requestADC(thread_id);
// Sample the ADC and save the value
sample = theADC.sampleADC(thread_id);
// Save the sample in the array
sampleBlock[i]=sample;
// Release the ADC
theADC.releaseADC(thread_id);
// Delay for random period between 0.1s 0.5s:
std::mt19937 gen(time(0)); // Mersenne Twister pseudo-random generator seeded by time(0) (number of seconds since 1/1/1970)
std::uniform_int_distribution<> dis(100,500); // Generate a random integer between 100 and 500 milliseconds (0.1s-0.5s)
random_delay_between_100_and_500_i = dis(gen); // The random period is stored in a variable
std::this_thread::sleep_for(std::chrono::milliseconds(random_delay_between_100_and_500_i)); // Random sleep time between 0.1s and 0.5s
// Request access to a Link, print out link id and thread id
Link link = theLinkController.requestLink(thread_id);
// Once link obtained delay for random period between 0.1s - 0.5s
std::mt19937 gen1(time(0)); // Mersenne Twister pseudo-random generator seeded by time(0) (number of seconds since 1/1/1970
std::uniform_int_distribution<> dis1(100,500); // Generate a random integer between 100 and 500 milliseconds (0.1s-0.5s)
random_delay_between_100_and_500_ii = dis1(gen1); // The random period is stored in a variable
std::this_thread::sleep_for(std::chrono::milliseconds(random_delay_between_100_and_500_ii)); // Random sleep time between 0.1s and 0.5s
// Transmit data to the Receiver
link.writeToDataLink(thread_id,sampleBlock);
// Release Link, print thread id and the link id of the Link that was released
theLinkController.releaseLink(link, thread_id);
}
} // End of Run function
// Main function
int main() {
// Initialise the ADC channels
std::vector<AdcInputChannel> adcChannels;
for (int k= 0; k < MAX_NUM_OF_CHAN; k++) {
adcChannels.push_back(AdcInputChannel(k)); // Each AdcInputChannel is initialised with a different value
}
// Instantiate the ADC
ADC theadc(adcChannels);
// Instantiate the Receiver
Receiver theReceiver;
// Instantiate the LinkAccessController
LinkAccessController lac(theReceiver);
// Instantiate and start the threads
std::thread the_threads[MAX_NUM_OF_THREADS]; // Array of threads
for (int i = 0; i < MAX_NUM_OF_THREADS; i++) {
// Launch the threads
the_threads[i] = std::thread(run, std::ref(theadc), std::ref(lac),i); // Threads execute run function
}
// Wait for the threads to finish
for (int n = 0; n < MAX_NUM_OF_THREADS; n++) {
the_threads[n].join();
}
// Add new lines to make the result clearly visible
std::cout << endl << endl << endl;
// Print out the data in the Receiver
std::cout << " Table of data in Receiver " << endl; // Header Line 1
std::cout << "-------------------------------------------" << endl; // Header Line 2
theReceiver.printBlocks(); // Print the 2-D array of data in the Receiver
std::cout << endl << endl; // New lines for visibility
cout << "All threads terminated" << endl; // Message for End of Threads
return 0;
} // End of Main function