1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package com.allanbank.mongodb.client.connection.socket;
22
23 import java.util.SortedMap;
24 import java.util.TreeMap;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.concurrent.atomic.AtomicLongArray;
27 import java.util.concurrent.locks.Condition;
28 import java.util.concurrent.locks.Lock;
29 import java.util.concurrent.locks.ReentrantLock;
30
31 import com.allanbank.mongodb.LockType;
32 import com.allanbank.mongodb.client.message.PendingMessageQueue;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 class Sequence {
48
49
50 private static final int RELEASE_OFFSET = 15 + 7;
51
52
53 private static final int RESERVE_OFFSET = 7;
54
55
56 private static final long YIELD_TIME_NS = PendingMessageQueue.YIELD_TIME_NS;
57
58
59 private final Condition myCondition;
60
61
62 private final Lock myLock;
63
64
65 private final LockType myLockType;
66
67
68 private final AtomicLongArray myPaddedValue = new AtomicLongArray(30);
69
70
71
72
73
74 private final SortedMap<Long, Condition> myWaiters;
75
76
77 private final AtomicInteger myWaiting;
78
79
80
81
82
83
84
85 public Sequence(final long initialValue) {
86 this(initialValue, LockType.MUTEX);
87 }
88
89
90
91
92
93
94
95
96
97 public Sequence(final long initialValue, final LockType lockType) {
98 myPaddedValue.set(RESERVE_OFFSET, initialValue);
99 myPaddedValue.set(RELEASE_OFFSET, initialValue);
100
101 myLockType = lockType;
102
103 myLock = new ReentrantLock(true);
104 myCondition = myLock.newCondition();
105 myWaiting = new AtomicInteger(0);
106 myWaiters = new TreeMap<Long, Condition>();
107 }
108
109
110
111
112
113
114 public int getWaitersCount() {
115 final long reserve = myPaddedValue.get(RESERVE_OFFSET);
116 final long release = myPaddedValue.get(RELEASE_OFFSET);
117
118 return (int) (release - reserve);
119 }
120
121
122
123
124
125
126 public boolean isIdle() {
127 final long reserve = myPaddedValue.get(RESERVE_OFFSET);
128 final long release = myPaddedValue.get(RELEASE_OFFSET);
129 return (reserve == release);
130 }
131
132
133
134
135
136
137
138
139 public boolean noWaiter(final long expectedReserve) {
140 final long reserve = myPaddedValue.get(RESERVE_OFFSET);
141
142 return (reserve == expectedReserve);
143 }
144
145
146
147
148
149
150
151
152
153 public void release(final long expectedValue, final long newValue) {
154 while (!compareAndSetRelease(expectedValue, newValue)) {
155
156
157 Thread.yield();
158 }
159 notifyWaiters();
160 }
161
162
163
164
165
166
167
168
169 public long reserve(final long numberOfMessages) {
170 long current;
171 long next;
172
173 do {
174 current = myPaddedValue.get(RESERVE_OFFSET);
175 next = current + numberOfMessages;
176 }
177 while (!compareAndSetReserve(current, next));
178
179 return current;
180 }
181
182
183
184
185
186
187
188 public void waitFor(final long wanted) {
189 long releaseValue = myPaddedValue.get(RELEASE_OFFSET);
190 while (releaseValue != wanted) {
191 if (myLockType == LockType.LOW_LATENCY_SPIN) {
192 long now = System.nanoTime();
193 final long yeildDeadline = now + YIELD_TIME_NS;
194
195 releaseValue = myPaddedValue.get(RELEASE_OFFSET);
196 while ((now < yeildDeadline) && (releaseValue != wanted)) {
197
198 Thread.yield();
199 now = System.nanoTime();
200 releaseValue = myPaddedValue.get(RELEASE_OFFSET);
201 }
202 }
203
204
205 if (releaseValue != wanted) {
206 final Long key = Long.valueOf(wanted);
207 Condition localCondition = myCondition;
208 try {
209 final int waitCount = myWaiting.incrementAndGet();
210 myLock.lock();
211
212
213 try {
214
215
216
217 if (waitCount > 1) {
218 localCondition = myLock.newCondition();
219 myWaiters.put(key, localCondition);
220 }
221
222 releaseValue = myPaddedValue.get(RELEASE_OFFSET);
223 while (releaseValue != wanted) {
224 localCondition.awaitUninterruptibly();
225 releaseValue = myPaddedValue.get(RELEASE_OFFSET);
226 }
227 }
228 finally {
229 if (localCondition != myCondition) {
230 myWaiters.remove(key);
231 }
232 }
233 }
234 finally {
235 myLock.unlock();
236 myWaiting.decrementAndGet();
237 }
238 }
239 }
240 }
241
242
243
244
245
246
247
248
249
250
251 private boolean compareAndSetRelease(final long expectedValue,
252 final long newValue) {
253 return myPaddedValue.compareAndSet(RELEASE_OFFSET, expectedValue,
254 newValue);
255 }
256
257
258
259
260
261
262
263
264
265
266 private boolean compareAndSetReserve(final long expectedValue,
267 final long newValue) {
268 return myPaddedValue.compareAndSet(RESERVE_OFFSET, expectedValue,
269 newValue);
270 }
271
272
273
274
275 private void notifyWaiters() {
276 if (myWaiting.get() > 0) {
277 try {
278 myLock.lock();
279
280
281 myCondition.signalAll();
282
283
284
285 if (!myWaiters.isEmpty()) {
286 myWaiters.get(myWaiters.firstKey()).signalAll();
287 }
288 }
289 finally {
290 myLock.unlock();
291 }
292 }
293 }
294 }