1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client.message;
21
22 import java.util.List;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.locks.Condition;
26 import java.util.concurrent.locks.Lock;
27 import java.util.concurrent.locks.ReentrantLock;
28
29 import com.allanbank.mongodb.LockType;
30 import com.allanbank.mongodb.client.Message;
31 import com.allanbank.mongodb.client.callback.ReplyCallback;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 public final class PendingMessageQueue {
96
97
98 public static final long MAX_MESSAGE_ID_MASK = 0x0FFFFFFF;
99
100
101
102
103
104 public static final int MAX_SIZE = (1 << 20);
105
106
107 public static final long SPIN_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) / 100;
108
109
110 public static final long YIELD_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) >> 1;
111
112
113 private static final int SPIN_ITERATIONS = 10000;
114
115
116 private final Condition myCondition;
117
118
119 private final Lock myLock;
120
121
122 private final LockType myLockType;
123
124
125 private final AtomicInteger myLooped;
126
127
128 private final int myMask;
129
130
131 private final PendingMessage[] myQueue;
132
133
134
135
136
137
138
139
140 private final AtomicInteger myReadyBeforePosition;
141
142
143
144
145
146
147
148
149 private final AtomicInteger myReservePosition;
150
151
152
153
154
155
156
157
158
159
160
161
162 private volatile int myTakePosition;
163
164
165 private final AtomicInteger myWaiting;
166
167
168
169
170
171
172
173
174
175 public PendingMessageQueue(final int size, final LockType lockType) {
176 int power = size;
177 if (MAX_SIZE < size) {
178 power = MAX_SIZE;
179 }
180 else if (Integer.bitCount(size) != 1) {
181
182 power = 1;
183 while ((power < size) && (power != 0)) {
184 power <<= 1;
185 }
186 }
187
188 myLockType = lockType;
189 myQueue = new PendingMessage[power];
190 for (int i = 0; i < myQueue.length; ++i) {
191 myQueue[i] = new PendingMessage(0, null);
192 }
193 myMask = (power - 1);
194
195 myLooped = new AtomicInteger(0);
196 myTakePosition = -1;
197 myReadyBeforePosition = new AtomicInteger(0);
198 myReservePosition = new AtomicInteger(0);
199 myWaiting = new AtomicInteger(0);
200
201 myLock = new ReentrantLock();
202 myCondition = myLock.newCondition();
203 }
204
205
206
207
208
209
210 public int capacity() {
211 return myQueue.length - 1;
212 }
213
214
215
216
217
218
219
220 public void drainTo(final List<PendingMessage> pending) {
221 PendingMessage pm = new PendingMessage();
222 while (poll(pm)) {
223 pending.add(pm);
224 pm = new PendingMessage();
225 }
226 }
227
228
229
230
231
232
233
234 public boolean isEmpty() {
235 final int take = myTakePosition;
236 final int readyBefore = myReadyBeforePosition.get();
237
238 return (readyBefore == take) || (take < 0);
239 }
240
241
242
243
244
245
246
247
248
249
250
251 public boolean offer(final Message message,
252 final ReplyCallback replyCallback) {
253
254 final int loop = myLooped.get();
255 final int reserve = offer();
256 if (reserve < 0) {
257 return false;
258 }
259
260 final int messageid = toMessageId(loop, reserve);
261
262 myQueue[reserve].set(messageid, message, replyCallback);
263
264 markReady(reserve);
265
266 return true;
267 }
268
269
270
271
272
273
274
275
276
277 public boolean offer(final PendingMessage pendingMessage) {
278 final int reserve = offer();
279 if (reserve < 0) {
280 return false;
281 }
282
283 myQueue[reserve].set(pendingMessage);
284
285 markReady(reserve);
286
287 return true;
288 }
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308 public boolean poll(final PendingMessage copyOut) {
309 boolean result = false;
310 final int take = myTakePosition;
311 if ((myReadyBeforePosition.get() != take) && (take >= 0)) {
312
313
314 copyOut.set(myQueue[take]);
315 myQueue[take].clear();
316 result = true;
317
318 myTakePosition = increment(take);
319 notifyWaiters(false);
320 }
321
322 return result;
323 }
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338 public void put(final Message message, final ReplyCallback replyCallback)
339 throws InterruptedException {
340
341 int loop = myLooped.get();
342 int reserve = offer();
343 if (reserve < 0) {
344
345
346
347
348 try {
349 myWaiting.incrementAndGet();
350 myLock.lock();
351
352 loop = myLooped.get();
353 reserve = offer();
354 while (reserve < 0) {
355 myCondition.await();
356 loop = myLooped.get();
357 reserve = offer();
358 }
359 }
360 finally {
361 myLock.unlock();
362 myWaiting.decrementAndGet();
363 }
364 }
365
366 final int messageid = toMessageId(loop, reserve);
367
368 myQueue[reserve].set(messageid, message, replyCallback);
369
370 markReady(reserve);
371 }
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391 public void put(final Message message, final ReplyCallback replyCallback,
392 final Message message2, final ReplyCallback replyCallback2)
393 throws InterruptedException {
394 int loop = myLooped.get();
395 int reserve = offer2();
396 if (reserve < 0) {
397
398
399 try {
400 myWaiting.incrementAndGet();
401 myLock.lock();
402
403 loop = myLooped.get();
404 reserve = offer2();
405 while (reserve < 0) {
406 myCondition.await();
407 loop = myLooped.get();
408 reserve = offer2();
409 }
410 }
411 finally {
412 myLock.unlock();
413 myWaiting.decrementAndGet();
414 }
415 }
416
417
418
419
420
421 final int messageId1 = toMessageId(loop, reserve);
422 final int messageId2 = toMessageId(loop, reserve + 1);
423
424 final int second = increment(reserve);
425 myQueue[reserve].set(messageId1, message, replyCallback);
426 myQueue[second].set(messageId2, message2, replyCallback2);
427
428 markReady2(reserve);
429 }
430
431
432
433
434
435
436
437
438
439
440
441
442 public void put(final PendingMessage pendingMessage)
443 throws InterruptedException {
444 int reserve = offer();
445 if (reserve < 0) {
446
447
448
449 try {
450 myWaiting.incrementAndGet();
451 myLock.lock();
452
453 reserve = offer();
454 while (reserve < 0) {
455 myCondition.await();
456 reserve = offer();
457 }
458 }
459 finally {
460 myLock.unlock();
461 myWaiting.decrementAndGet();
462 }
463 }
464
465 myQueue[reserve].set(pendingMessage);
466
467 markReady(reserve);
468 }
469
470
471
472
473
474
475 public int size() {
476 final int take = myTakePosition;
477 final int ready = myReadyBeforePosition.get();
478
479 if (take < 0) {
480 return 0;
481 }
482 else if (take <= ready) {
483 return (ready - take);
484 }
485
486 return (myQueue.length - take) + ready;
487 }
488
489
490
491
492
493
494
495
496
497
498 public void take(final PendingMessage copyOut) throws InterruptedException {
499 if (!poll(copyOut)) {
500
501
502 if (myLockType == LockType.LOW_LATENCY_SPIN) {
503 long now = 0;
504 long spinDeadline = 1;
505 long yeildDeadline = 1;
506 while (now < yeildDeadline) {
507 for (int i = 0; i < SPIN_ITERATIONS; ++i) {
508 if (poll(copyOut)) {
509 return;
510 }
511 }
512
513
514 now = System.nanoTime();
515 if (spinDeadline == 1) {
516 spinDeadline = now + SPIN_TIME_NS;
517 yeildDeadline = now + YIELD_TIME_NS;
518
519 }
520 else {
521 if ((spinDeadline < now) && (now < yeildDeadline)) {
522 Thread.yield();
523 }
524 }
525 }
526 }
527
528
529 try {
530 myWaiting.incrementAndGet();
531 myLock.lock();
532
533 while (!poll(copyOut)) {
534 myCondition.await();
535 }
536 }
537 finally {
538 myLock.unlock();
539 myWaiting.decrementAndGet();
540 }
541 }
542 }
543
544
545
546
547
548
549
550
551 protected int increment(final int index) {
552 return ((index + 1) & myMask);
553 }
554
555
556
557
558
559
560
561
562
563
564 protected void markReady(final int index) {
565 final int after = increment(index);
566
567 while (!myReadyBeforePosition.compareAndSet(index, after)) {
568
569
570 Thread.yield();
571 }
572
573
574 if ((index == 0) && (myTakePosition == -1)) {
575 myTakePosition = index;
576 }
577
578 notifyWaiters(false);
579 }
580
581
582
583
584
585
586
587
588
589
590 protected void markReady2(final int index) {
591 final int after = increment(index);
592 final int twoAfter = increment(after);
593
594 while (!myReadyBeforePosition.compareAndSet(index, twoAfter)) {
595
596 Thread.yield();
597 }
598
599
600 if ((index == 0) && (myTakePosition == -1)) {
601 myTakePosition = index;
602 }
603
604
605 notifyWaiters(true);
606 }
607
608
609
610
611
612
613
614
615 protected void notifyWaiters(final boolean all) {
616 if (myWaiting.get() > 0) {
617 try {
618 myLock.lock();
619 if (all) {
620 myCondition.signalAll();
621 }
622 else {
623 myCondition.signal();
624 }
625 }
626 finally {
627 myLock.unlock();
628 }
629 }
630 }
631
632
633
634
635
636
637
638
639 protected int offer() {
640 int result = -1;
641 final int reserve = myReservePosition.get();
642 final int next = increment(reserve);
643 if ((myTakePosition != next)
644 && myReservePosition.compareAndSet(reserve, next)) {
645
646
647 result = reserve;
648
649
650 if (next < reserve) {
651 myLooped.incrementAndGet();
652 }
653 }
654 return result;
655 }
656
657
658
659
660
661
662
663
664
665 protected int offer2() {
666 int result = -1;
667 final int reserve = myReservePosition.get();
668 final int first = increment(reserve);
669 final int second = increment(first);
670 final int take = myTakePosition;
671 if ((take != first) && (take != second)
672 && myReservePosition.compareAndSet(reserve, second)) {
673
674
675 result = reserve;
676
677
678 if (second < reserve) {
679 myLooped.incrementAndGet();
680 }
681 }
682 return result;
683 }
684
685
686
687
688
689
690
691
692
693
694
695
696 private int toMessageId(final int loop, final long reserve) {
697 final long loopOffset = (((long) loop) * myQueue.length);
698 if (loopOffset > MAX_MESSAGE_ID_MASK) {
699 myLooped.compareAndSet(loop, 0);
700 }
701
702 return (int) ((loopOffset + reserve) & MAX_MESSAGE_ID_MASK) + 1;
703 }
704 }