-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathatomic_integer_counter
More file actions
140 lines (128 loc) · 4.83 KB
/
atomic_integer_counter
File metadata and controls
140 lines (128 loc) · 4.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/* An integer counter class using "compare_and_swap (CAS) primitive function"
providing increment, decrement, set and get APIs in an atomic and
thread-safe way. */
/* Atomic CPU instruction */
/*bool compare_and_swap(int* dest, int old_val, int new_val)
*{
* if (*dest == old_val) {
* *dest = new_val;
* return true;
* }
* return false;
*}
*/
#include <atomic>
using namespace std;
class integer_counter {
private:
atomic<int> count;
public:
/* ctor initializes the count to 0 */
integer_counter() : count(0) { }
/* dtor */
~integer_counter();
/* this does not need any kind of synchronization mechanism as it returns
a value of count that is concrete at some instance of time */
int get() {
return count.load();
}
/* we need a synchronization mechanism to ensure that we write the value
onto 'count' concretely once (read, modify and writeback with val passed
by the caller happens once) */
void set(int val) {
int old = this->get();
while (!compare_and_swap(&(this->count), old, val)) {
old = this->get();
}
/* This could also be written in one line as:
* this->count.store(val)
*/
}
/* we need a synchronization mechanism to ensure that counter is
incremented exactly once. */
void increment() {
int old = this->get();
while (!compare_and_swap(&(this->count), old, old + 1)) {
old = this->get();
}
}
/* we need a synchronization mechanism to ensure that counter is
decremented exactly once */
void decrement() {
int old = this->get();
while (!compare_and_swap(&(this->count), old, old - 1)) {
old = this->get();
}
}
};
/* NOTE:
* 1) Probably, we did not need atomic<int> (a normal int would do) if
* both read() and update() (in read(), modify() and update()) of an int
* were guarenteed to be atomic.
* 2) We CANNOT implement increment() as set((this->count) + 1) because it
* ensure we incrementing the count by 1. It only ensures we are setting
* count to a particular example.
* Ex: At an instance value the count is 5, Thread T1 requests increment()
* and Thread T2 also requests increment(). Irrespective of whatever
* order they get executed successfully that value of count should go
* to 7. If we use set((this->count) + 1), we can end up with setting
* the value to 6.
* Similarly we CANNOT implement decrement() as set((this->count) - 1).
* 3) get, set, increment and decrement do their respective operations exactly
* once but do not guarantee the order in which they are done. That is, when
* multiple threads call the APIs in some order, the results seen by these
* threads need not be in that particular order.
* Ex: At an instance value the count is 5, Thread T1 requests set(7),
* Thread T2 requests get(), Thread T3 requests increment(),
* Thread T4 requests decrement().
* T2 not guarenteed to see 7. T2 could get a value of 4 or 5 or 6 or 7
* or 8
*/
===============================================================================
Potential problems of above code in a context subject to periodic highly
intensive concurrent activity (i.e. at peak, hundreds of
threads looking to update a given counter in a short period of time.)
Problem 1:
==========
When subject periodic highly intensive concurrent activity some threads may
starve or take more time than other threads to complete their operation.
The above is LOCK-FREE solution, not a WAIT-FREE solution. Hence there
is no bound on the wait time of a particular thread. Hence throughput/
performance of some threads may be less than that of others.
Solution:
We can use CAS with a BOUNDED_WAIT mechanism. The solution would become
more complicated as we would need to implement CAS as a lock (rather than
directly doing critical section operation inside the CAS). Hence we may
need a sepaerate readers lock and a writers lock for better performance.
Implementation would look like:
bool waiting[n] -- n threads max
bool lock;
/* Acquire lock by thread i */
waiting[i] = true;
while (waiting[i] && !CAS(&lock, 0, 1)) {
/* Wait */
}
waiting[i] = true;
/* Critical section operations */
.
.
.
/* Release lock by thread i */
j = (i+1) % n;
while((j!=i) && !waiting[j]) {
j = (j+1) % n;
}
if(j == i) {
lock = false; /* No threads are, any thread is free to pick a lock */
} else {
waiting[j] = false; /* A waiting thread gets a chance */
}
Problem 2:
=========
Since CAS is a busy wait mechanism, the overall throughput of the system can
suffer. (Many threads will simultaneously try to execute the operation
succcessfully and most of those would fail and thereby unncessarily burning
CPU cycles)
Solution: We can introduce a sleep (usleep) when a thread cannot acquire a
lock. The sleep duration can be set random (within a range) to ensure that
threads do not all sleep and try to acquire the lock at the same time.