1 /*
2 * #%L
3 * PendingMessageQueue.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20 package com.allanbank.mongodb.client.message;
21
22 import java.util.List;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.locks.Condition;
26 import java.util.concurrent.locks.Lock;
27 import java.util.concurrent.locks.ReentrantLock;
28
29 import com.allanbank.mongodb.LockType;
30 import com.allanbank.mongodb.client.Message;
31 import com.allanbank.mongodb.client.callback.ReplyCallback;
32
33 /**
34 * PendingMessageQueue provides an optimized queue for pending messages inspired
35 * by the Disruptor project.
36 * <p>
37 * To reduce thread contention the queue uses a set of integer values to track
38 * the position of the ready messages (the last message that is ready to be
39 * read), reserve (the first message that can be reserved to be written to), and
40 * the take (the next (first) message to be read). For an infinite queue the
41 * following invariant holds: <blockquote>
42 *
43 * <pre>
44 * <code>
45 * take < readyBefore <= reserve
46 * </code>
47 * </pre>
48 *
49 * </blockquote> To make handling a limited size queue easier the size of the
50 * queue is forced to power of 2 less than {@value #MAX_SIZE}. The roll over can
51 * then be handled with a simple mask operation.
52 * </p>
53 * <p>
54 * Rather than allocate a pending message per request we use an array of
55 * pre-allocated PendingMessages and copy the data into and out of the objects.
56 * this has a net positive effect on object allocation and garbage collection
57 * time at the cost of a longer initialization.
58 * </p>
59 * <p>
60 * Lastly, This queue assumes there is a single consumer of messages. This is
61 * true for the driver's use case but don't copy the code and expect it to work
62 * with multiple consumers. The consumer should use the following basic
63 * structure: <blockquote>
64 *
65 * <pre>
66 * <code>
67 * PendingMessage pm = new {@link PendingMessage}();
68 *
69 * queue.take(pm); // Blocks.
70 * // Handle the message.
71 *
72 * // or
73 *
74 * if( queue.poll(pm) ) { // Non-blocking.
75 * // Handle The Message.
76 * }
77 * </code>
78 * </pre>
79 *
80 * </blockquote>
81 * </p>
82 * <p>
83 * <b>Warning: </b> This class has been carefully tuned for the driver's use
84 * case. Changes should be carefully bench marked and tested. Comments have been
85 * embedded in the source indicating attempted changes and reverts. Due to its
86 * position in the driver subtle changes in this class can cause large changes
87 * in the performance of the driver.
88 * </p>
89 *
90 * @see <a href="http://code.google.com/p/disruptor/">Disruptor Project</a>
91 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
92 * mutated in incompatible ways between any two releases of the driver.
93 * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
94 */
95 public final class PendingMessageQueue {
96
97 /** The mask for constraining the size the message id. */
98 public static final long MAX_MESSAGE_ID_MASK = 0x0FFFFFFF;
99
100 /**
101 * The maximum size of the queue. This it currently 2^20 but must be at most
102 * 2^30 to ensure masking works.
103 */
104 public static final int MAX_SIZE = (1 << 20);
105
106 /** Amount of time to spin before yielding. Set to 1/100 of a millisecond. */
107 public static final long SPIN_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) / 100;
108
109 /** Amount of time to spin/yield before waiting. Set to 1/2 millisecond. */
110 public static final long YIELD_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) >> 1;
111
112 /** Number of times to spin before trying something different. */
113 private static final int SPIN_ITERATIONS = 10000;
114
115 /** The condition used with the queue being full or empty. */
116 private final Condition myCondition;
117
118 /** The mutex used with the queue. */
119 private final Lock myLock;
120
121 /** The lock type to use with the queue. */
122 private final LockType myLockType;
123
124 /** Tracks how many times we have looped through the ring buffer. */
125 private final AtomicInteger myLooped;
126
127 /** The mask being used. */
128 private final int myMask;
129
130 /** The queue of pending messages. */
131 private final PendingMessage[] myQueue;
132
133 /**
134 * The position of the last message that is ready to be taken.
135 * <p>
136 * When ({@link #myReadyBeforePosition} == {@link #myTakePosition}) the
137 * queue is empty.
138 * </p>
139 */
140 private final AtomicInteger myReadyBeforePosition;
141
142 /**
143 * The position of the next message that can be reserved.
144 * <p>
145 * When ({@link #myReservePosition} == ({@link #myTakePosition} - 1)) the
146 * queue is full.
147 * </p>
148 */
149 private final AtomicInteger myReservePosition;
150
151 /**
152 * The position of the next message that can be taken.
153 * <p>
154 * When ({@link #myReservePosition} == ({@link #myTakePosition} - 1)) the
155 * queue is full.
156 * </p>
157 * <p>
158 * When ({@link #myReadyBeforePosition} == {@link #myTakePosition}) the
159 * queue is empty.
160 * </p>
161 */
162 private volatile int myTakePosition;
163
164 /** Tracks how many threads are waiting for a message or a space to open. */
165 private final AtomicInteger myWaiting;
166
167 /**
168 * Creates a new PendingMessageQueue.
169 *
170 * @param size
171 * The size of the queue to create.
172 * @param lockType
173 * The lock type to use with the queue.
174 */
175 public PendingMessageQueue(final int size, final LockType lockType) {
176 int power = size;
177 if (MAX_SIZE < size) {
178 power = MAX_SIZE;
179 }
180 else if (Integer.bitCount(size) != 1) {
181 // Find the next larger power of 2.
182 power = 1;
183 while ((power < size) && (power != 0)) {
184 power <<= 1;
185 }
186 }
187
188 myLockType = lockType;
189 myQueue = new PendingMessage[power];
190 for (int i = 0; i < myQueue.length; ++i) {
191 myQueue[i] = new PendingMessage(0, null);
192 }
193 myMask = (power - 1);
194
195 myLooped = new AtomicInteger(0);
196 myTakePosition = -1;
197 myReadyBeforePosition = new AtomicInteger(0);
198 myReservePosition = new AtomicInteger(0);
199 myWaiting = new AtomicInteger(0);
200
201 myLock = new ReentrantLock();
202 myCondition = myLock.newCondition();
203 }
204
205 /**
206 * Returns the size of the queue.
207 *
208 * @return The size of the queue.
209 */
210 public int capacity() {
211 return myQueue.length - 1;
212 }
213
214 /**
215 * Drains the list of pending messages into the provided list.
216 *
217 * @param pending
218 * The list to add all of the pending messages to.
219 */
220 public void drainTo(final List<PendingMessage> pending) {
221 PendingMessage pm = new PendingMessage();
222 while (poll(pm)) {
223 pending.add(pm);
224 pm = new PendingMessage();
225 }
226 }
227
228 /**
229 * Returns true if the queue is empty. e.g., the next take position is the
230 * read before position.
231 *
232 * @return If the queue is empty.
233 */
234 public boolean isEmpty() {
235 final int take = myTakePosition;
236 final int readyBefore = myReadyBeforePosition.get();
237
238 return (readyBefore == take) || (take < 0);
239 }
240
241 /**
242 * Puts a message onto the queue. This method will not block waiting for a
243 * space to add the message.
244 *
245 * @param message
246 * The message to add.
247 * @param replyCallback
248 * The callback for the message to add.
249 * @return True if the message was added, false otherwise.
250 */
251 public boolean offer(final Message message,
252 final ReplyCallback replyCallback) {
253
254 final int loop = myLooped.get();
255 final int reserve = offer();
256 if (reserve < 0) {
257 return false;
258 }
259
260 final int messageid = toMessageId(loop, reserve);
261
262 myQueue[reserve].set(messageid, message, replyCallback);
263
264 markReady(reserve);
265
266 return true;
267 }
268
269 /**
270 * Puts a message onto the queue. This method will not block waiting for a
271 * space to add the message.
272 *
273 * @param pendingMessage
274 * The message to add.
275 * @return True if the message was added, false otherwise.
276 */
277 public boolean offer(final PendingMessage pendingMessage) {
278 final int reserve = offer();
279 if (reserve < 0) {
280 return false;
281 }
282
283 myQueue[reserve].set(pendingMessage);
284
285 markReady(reserve);
286
287 return true;
288 }
289
290 /**
291 * Returns the next message from the queue without blocking. <blockquote>
292 *
293 * <pre>
294 * <code>
295 * PendingMessage pm = new PendingMessage();
296 * if( queue.poll(pm) } {
297 * // Handle the message copied into pm.
298 * }
299 * </code>
300 * </pre>
301 *
302 * </blockquote>
303 *
304 * @param copyOut
305 * The {@link PendingMessage} to copy the pending message into.
306 * @return True if the pending message was updated.
307 */
308 public boolean poll(final PendingMessage copyOut) {
309 boolean result = false;
310 final int take = myTakePosition;
311 if ((myReadyBeforePosition.get() != take) && (take >= 0)) { // Empty,
312 // Not
313 // started?
314 copyOut.set(myQueue[take]);
315 myQueue[take].clear();
316 result = true;
317
318 myTakePosition = increment(take);
319 notifyWaiters(false);
320 }
321
322 return result;
323 }
324
325 /**
326 * Puts a message onto the queue. This method will block waiting for a space
327 * to add the message.
328 *
329 * @param message
330 * The message to add.
331 * @param replyCallback
332 * The callback for the message to add.
333 *
334 * @throws InterruptedException
335 * If the thread is interrupted while waiting for the message.
336 * If thrown the message will not have been enqueued.
337 */
338 public void put(final Message message, final ReplyCallback replyCallback)
339 throws InterruptedException {
340
341 int loop = myLooped.get();
342 int reserve = offer();
343 if (reserve < 0) {
344
345 // Spinning here appears to slow things down.
346
347 // Block.
348 try {
349 myWaiting.incrementAndGet();
350 myLock.lock();
351
352 loop = myLooped.get();
353 reserve = offer();
354 while (reserve < 0) {
355 myCondition.await();
356 loop = myLooped.get();
357 reserve = offer();
358 }
359 }
360 finally {
361 myLock.unlock();
362 myWaiting.decrementAndGet();
363 }
364 }
365
366 final int messageid = toMessageId(loop, reserve);
367
368 myQueue[reserve].set(messageid, message, replyCallback);
369
370 markReady(reserve);
371 }
372
373 /**
374 * Puts two messages onto the queue. This method will block waiting for a
375 * space to add the messages but ensures the messages are in sequence in the
376 * queue.
377 *
378 * @param message
379 * The first message to add.
380 * @param replyCallback
381 * The callback for the first message to add.
382 * @param message2
383 * The second message to add.
384 * @param replyCallback2
385 * The callback for the second message to add.
386 *
387 * @throws InterruptedException
388 * If the thread is interrupted while waiting for the message.
389 * If thrown neither message will have been enqueued.
390 */
391 public void put(final Message message, final ReplyCallback replyCallback,
392 final Message message2, final ReplyCallback replyCallback2)
393 throws InterruptedException {
394 int loop = myLooped.get();
395 int reserve = offer2();
396 if (reserve < 0) {
397
398 // Spinning here appears to slow things down.
399 try {
400 myWaiting.incrementAndGet();
401 myLock.lock();
402
403 loop = myLooped.get();
404 reserve = offer2();
405 while (reserve < 0) {
406 myCondition.await();
407 loop = myLooped.get();
408 reserve = offer2();
409 }
410 }
411 finally {
412 myLock.unlock();
413 myWaiting.decrementAndGet();
414 }
415 }
416
417 // Use reserve + 1 for the second message id since it may have looped
418 // and then the math does not work out causing messageId2 to be lower
419 // than
420 // messageId1, which is bad.
421 final int messageId1 = toMessageId(loop, reserve);
422 final int messageId2 = toMessageId(loop, reserve + 1);
423
424 final int second = increment(reserve);
425 myQueue[reserve].set(messageId1, message, replyCallback);
426 myQueue[second].set(messageId2, message2, replyCallback2);
427
428 markReady2(reserve);
429 }
430
431 /**
432 * Puts a message onto the queue. This method will block waiting for a space
433 * to add the message.
434 *
435 * @param pendingMessage
436 * The message to add.
437 *
438 * @throws InterruptedException
439 * If the thread is interrupted while waiting for the message.
440 * If thrown the message will not have been enqueued.
441 */
442 public void put(final PendingMessage pendingMessage)
443 throws InterruptedException {
444 int reserve = offer();
445 if (reserve < 0) {
446
447 // Spinning here appears to slow things down.
448
449 try {
450 myWaiting.incrementAndGet();
451 myLock.lock();
452
453 reserve = offer();
454 while (reserve < 0) {
455 myCondition.await();
456 reserve = offer();
457 }
458 }
459 finally {
460 myLock.unlock();
461 myWaiting.decrementAndGet();
462 }
463 }
464
465 myQueue[reserve].set(pendingMessage);
466
467 markReady(reserve);
468 }
469
470 /**
471 * Returns the number of messages in the queue.
472 *
473 * @return The number of messages in the queue.
474 */
475 public int size() {
476 final int take = myTakePosition;
477 final int ready = myReadyBeforePosition.get();
478
479 if (take < 0) {
480 return 0;
481 }
482 else if (take <= ready) {
483 return (ready - take);
484 }
485
486 return (myQueue.length - take) + ready;
487 }
488
489 /**
490 * Returns the next message from the queue and will block waiting for a
491 * message.
492 *
493 * @param copyOut
494 * The {@link PendingMessage} to copy the pending message into.
495 * @throws InterruptedException
496 * If the thread is interrupted while waiting for the message.
497 */
498 public void take(final PendingMessage copyOut) throws InterruptedException {
499 if (!poll(copyOut)) {
500
501 // Spin/yeild loop.
502 if (myLockType == LockType.LOW_LATENCY_SPIN) {
503 long now = 0;
504 long spinDeadline = 1;
505 long yeildDeadline = 1;
506 while (now < yeildDeadline) {
507 for (int i = 0; i < SPIN_ITERATIONS; ++i) {
508 if (poll(copyOut)) {
509 return;
510 }
511 }
512
513 // Pause?
514 now = System.nanoTime();
515 if (spinDeadline == 1) {
516 spinDeadline = now + SPIN_TIME_NS;
517 yeildDeadline = now + YIELD_TIME_NS;
518 // First time free pass.
519 }
520 else {
521 if ((spinDeadline < now) && (now < yeildDeadline)) {
522 Thread.yield();
523 }
524 }
525 }
526 }
527
528 // Block.
529 try {
530 myWaiting.incrementAndGet();
531 myLock.lock();
532
533 while (!poll(copyOut)) {
534 myCondition.await();
535 }
536 }
537 finally {
538 myLock.unlock();
539 myWaiting.decrementAndGet();
540 }
541 }
542 }
543
544 /**
545 * Increments the index handling roll-over.
546 *
547 * @param index
548 * The value to increment.
549 * @return The incremented value.
550 */
551 protected int increment(final int index) {
552 return ((index + 1) & myMask);
553 }
554
555 /**
556 * Marks the position as ready by incrementing the ready position to the
557 * provided position. This method uses a spin lock assuming any other
558 * threads will increment the ready position quickly to the position just
559 * before {@code index}.
560 *
561 * @param index
562 * The index of the ready message.
563 */
564 protected void markReady(final int index) {
565 final int after = increment(index);
566
567 while (!myReadyBeforePosition.compareAndSet(index, after)) {
568 // Spinning here slows things down because we know that the other
569 // thread should be runnable. Always Yield.
570 Thread.yield();
571 }
572
573 // Pull take position into the queue.
574 if ((index == 0) && (myTakePosition == -1)) {
575 myTakePosition = index;
576 }
577
578 notifyWaiters(false);
579 }
580
581 /**
582 * Marks the position and the next position as ready by incrementing the
583 * ready position to the provided position + 1. This method uses a spin lock
584 * assuming any other threads will increment the ready position quickly to
585 * the position just before {@code index}.
586 *
587 * @param index
588 * The index of the ready message.
589 */
590 protected void markReady2(final int index) {
591 final int after = increment(index);
592 final int twoAfter = increment(after);
593
594 while (!myReadyBeforePosition.compareAndSet(index, twoAfter)) {
595 // Just keep swimming...
596 Thread.yield();
597 }
598
599 // Pull take position into the queue.
600 if ((index == 0) && (myTakePosition == -1)) {
601 myTakePosition = index;
602 }
603
604 // If someone is waiting let them know we created two messages.
605 notifyWaiters(true);
606 }
607
608 /**
609 * Notifies the waiting threads that the state of the queue has changed.
610 *
611 * @param all
612 * If true then all threads will be woken. Otherwise only a
613 * single thread is woken.
614 */
615 protected void notifyWaiters(final boolean all) {
616 if (myWaiting.get() > 0) {
617 try {
618 myLock.lock();
619 if (all) {
620 myCondition.signalAll();
621 }
622 else {
623 myCondition.signal();
624 }
625 }
626 finally {
627 myLock.unlock();
628 }
629 }
630 }
631
632 /**
633 * Checks if there is remove for another message. If so returns the index of
634 * the message to update. If not return a value less then zero.
635 *
636 * @return The position of the message that can be updated or a value of
637 * less than zero if the queue is full.
638 */
639 protected int offer() {
640 int result = -1;
641 final int reserve = myReservePosition.get();
642 final int next = increment(reserve);
643 if ((myTakePosition != next) /* Full? */
644 && myReservePosition.compareAndSet(reserve, next)) {
645
646 // Got a slot.
647 result = reserve;
648
649 // Check if we looped.
650 if (next < reserve) {
651 myLooped.incrementAndGet();
652 }
653 }
654 return result;
655 }
656
657 /**
658 * Checks if there is remove for another two message. If so returns the
659 * index of the first message to update. If not return a value less then
660 * zero.
661 *
662 * @return The position of the first message that can be updated or a value
663 * of less than zero if the queue is full.
664 */
665 protected int offer2() {
666 int result = -1;
667 final int reserve = myReservePosition.get();
668 final int first = increment(reserve);
669 final int second = increment(first);
670 final int take = myTakePosition;
671 if ((take != first) && (take != second) /* Full? */
672 && myReservePosition.compareAndSet(reserve, second)) {
673
674 // Got two slots. Return the first.
675 result = reserve;
676
677 // Check if we looped.
678 if (second < reserve) {
679 myLooped.incrementAndGet();
680 }
681 }
682 return result;
683 }
684
685 /**
686 * Computes a new message id based on the current loop and reserve spot in
687 * the queue.
688 *
689 * @param loop
690 * The number of time the queue has looped over the queue.
691 * @param reserve
692 * The reserved position in the queue. This can be a virtual
693 * postion.
694 * @return The message id to use.
695 */
696 private int toMessageId(final int loop, final long reserve) {
697 final long loopOffset = (((long) loop) * myQueue.length);
698 if (loopOffset > MAX_MESSAGE_ID_MASK) {
699 myLooped.compareAndSet(loop, 0);
700 }
701 // Add an extra 1 so the first value is 1 instead of zero.
702 return (int) ((loopOffset + reserve) & MAX_MESSAGE_ID_MASK) + 1;
703 }
704 }