Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
PendingMessageQueue |
|
| 3.5789473684210527;3.579 |
1 | /* | |
2 | * #%L | |
3 | * PendingMessageQueue.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | package com.allanbank.mongodb.client.message; | |
21 | ||
22 | import java.util.List; | |
23 | import java.util.concurrent.TimeUnit; | |
24 | import java.util.concurrent.atomic.AtomicInteger; | |
25 | import java.util.concurrent.locks.Condition; | |
26 | import java.util.concurrent.locks.Lock; | |
27 | import java.util.concurrent.locks.ReentrantLock; | |
28 | ||
29 | import com.allanbank.mongodb.LockType; | |
30 | import com.allanbank.mongodb.client.Message; | |
31 | import com.allanbank.mongodb.client.callback.ReplyCallback; | |
32 | ||
33 | /** | |
34 | * PendingMessageQueue provides an optimized queue for pending messages inspired | |
35 | * by the Disruptor project. | |
36 | * <p> | |
37 | * To reduce thread contention the queue uses a set of integer values to track | |
38 | * the position of the ready messages (the last message that is ready to be | |
39 | * read), reserve (the first message that can be reserved to be written to), and | |
40 | * the take (the next (first) message to be read). For an infinite queue the | |
41 | * following invariant holds: <blockquote> | |
42 | * | |
43 | * <pre> | |
44 | * <code> | |
45 | * take < readyBefore <= reserve | |
46 | * </code> | |
47 | * </pre> | |
48 | * | |
49 | * </blockquote> To make handling a limited size queue easier the size of the | |
50 | * queue is forced to power of 2 less than {@value #MAX_SIZE}. The roll over can | |
51 | * then be handled with a simple mask operation. | |
52 | * </p> | |
53 | * <p> | |
54 | * Rather than allocate a pending message per request we use an array of | |
55 | * pre-allocated PendingMessages and copy the data into and out of the objects. | |
56 | * this has a net positive effect on object allocation and garbage collection | |
57 | * time at the cost of a longer initialization. | |
58 | * </p> | |
59 | * <p> | |
60 | * Lastly, This queue assumes there is a single consumer of messages. This is | |
61 | * true for the driver's use case but don't copy the code and expect it to work | |
62 | * with multiple consumers. The consumer should use the following basic | |
63 | * structure: <blockquote> | |
64 | * | |
65 | * <pre> | |
66 | * <code> | |
67 | * PendingMessage pm = new {@link PendingMessage}(); | |
68 | * | |
69 | * queue.take(pm); // Blocks. | |
70 | * // Handle the message. | |
71 | * | |
72 | * // or | |
73 | * | |
74 | * if( queue.poll(pm) ) { // Non-blocking. | |
75 | * // Handle The Message. | |
76 | * } | |
77 | * </code> | |
78 | * </pre> | |
79 | * | |
80 | * </blockquote> | |
81 | * </p> | |
82 | * <p> | |
83 | * <b>Warning: </b> This class has been carefully tuned for the driver's use | |
84 | * case. Changes should be carefully bench marked and tested. Comments have been | |
85 | * embedded in the source indicating attempted changes and reverts. Due to its | |
86 | * position in the driver subtle changes in this class can cause large changes | |
87 | * in the performance of the driver. | |
88 | * </p> | |
89 | * | |
90 | * @see <a href="http://code.google.com/p/disruptor/">Disruptor Project</a> | |
91 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may be | |
92 | * mutated in incompatible ways between any two releases of the driver. | |
93 | * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved | |
94 | */ | |
95 | public final class PendingMessageQueue { | |
96 | ||
97 | /** The mask for constraining the size the message id. */ | |
98 | public static final long MAX_MESSAGE_ID_MASK = 0x0FFFFFFF; | |
99 | ||
100 | /** | |
101 | * The maximum size of the queue. This it currently 2^20 but must be at most | |
102 | * 2^30 to ensure masking works. | |
103 | */ | |
104 | public static final int MAX_SIZE = (1 << 20); | |
105 | ||
106 | /** Amount of time to spin before yielding. Set to 1/100 of a millisecond. */ | |
107 | 1 | public static final long SPIN_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) / 100; |
108 | ||
109 | /** Amount of time to spin/yield before waiting. Set to 1/2 millisecond. */ | |
110 | 1 | public static final long YIELD_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) >> 1; |
111 | ||
112 | /** Number of times to spin before trying something different. */ | |
113 | private static final int SPIN_ITERATIONS = 10000; | |
114 | ||
115 | /** The condition used with the queue being full or empty. */ | |
116 | private final Condition myCondition; | |
117 | ||
118 | /** The mutex used with the queue. */ | |
119 | private final Lock myLock; | |
120 | ||
121 | /** The lock type to use with the queue. */ | |
122 | private final LockType myLockType; | |
123 | ||
124 | /** Tracks how many times we have looped through the ring buffer. */ | |
125 | private final AtomicInteger myLooped; | |
126 | ||
127 | /** The mask being used. */ | |
128 | private final int myMask; | |
129 | ||
130 | /** The queue of pending messages. */ | |
131 | private final PendingMessage[] myQueue; | |
132 | ||
133 | /** | |
134 | * The position of the last message that is ready to be taken. | |
135 | * <p> | |
136 | * When ({@link #myReadyBeforePosition} == {@link #myTakePosition}) the | |
137 | * queue is empty. | |
138 | * </p> | |
139 | */ | |
140 | private final AtomicInteger myReadyBeforePosition; | |
141 | ||
142 | /** | |
143 | * The position of the next message that can be reserved. | |
144 | * <p> | |
145 | * When ({@link #myReservePosition} == ({@link #myTakePosition} - 1)) the | |
146 | * queue is full. | |
147 | * </p> | |
148 | */ | |
149 | private final AtomicInteger myReservePosition; | |
150 | ||
151 | /** | |
152 | * The position of the next message that can be taken. | |
153 | * <p> | |
154 | * When ({@link #myReservePosition} == ({@link #myTakePosition} - 1)) the | |
155 | * queue is full. | |
156 | * </p> | |
157 | * <p> | |
158 | * When ({@link #myReadyBeforePosition} == {@link #myTakePosition}) the | |
159 | * queue is empty. | |
160 | * </p> | |
161 | */ | |
162 | private volatile int myTakePosition; | |
163 | ||
164 | /** Tracks how many threads are waiting for a message or a space to open. */ | |
165 | private final AtomicInteger myWaiting; | |
166 | ||
167 | /** | |
168 | * Creates a new PendingMessageQueue. | |
169 | * | |
170 | * @param size | |
171 | * The size of the queue to create. | |
172 | * @param lockType | |
173 | * The lock type to use with the queue. | |
174 | */ | |
175 | 254 | public PendingMessageQueue(final int size, final LockType lockType) { |
176 | 254 | int power = size; |
177 | 254 | if (MAX_SIZE < size) { |
178 | 2 | power = MAX_SIZE; |
179 | } | |
180 | 252 | else if (Integer.bitCount(size) != 1) { |
181 | // Find the next larger power of 2. | |
182 | 2 | power = 1; |
183 | 22 | while ((power < size) && (power != 0)) { |
184 | 20 | power <<= 1; |
185 | } | |
186 | } | |
187 | ||
188 | 254 | myLockType = lockType; |
189 | 254 | myQueue = new PendingMessage[power]; |
190 | 2347774 | for (int i = 0; i < myQueue.length; ++i) { |
191 | 2347520 | myQueue[i] = new PendingMessage(0, null); |
192 | } | |
193 | 254 | myMask = (power - 1); |
194 | ||
195 | 254 | myLooped = new AtomicInteger(0); |
196 | 254 | myTakePosition = -1; |
197 | 254 | myReadyBeforePosition = new AtomicInteger(0); |
198 | 254 | myReservePosition = new AtomicInteger(0); |
199 | 254 | myWaiting = new AtomicInteger(0); |
200 | ||
201 | 254 | myLock = new ReentrantLock(); |
202 | 254 | myCondition = myLock.newCondition(); |
203 | 254 | } |
204 | ||
205 | /** | |
206 | * Returns the size of the queue. | |
207 | * | |
208 | * @return The size of the queue. | |
209 | */ | |
210 | public int capacity() { | |
211 | 54260 | return myQueue.length - 1; |
212 | } | |
213 | ||
214 | /** | |
215 | * Drains the list of pending messages into the provided list. | |
216 | * | |
217 | * @param pending | |
218 | * The list to add all of the pending messages to. | |
219 | */ | |
220 | public void drainTo(final List<PendingMessage> pending) { | |
221 | 2 | PendingMessage pm = new PendingMessage(); |
222 | 2048 | while (poll(pm)) { |
223 | 2046 | pending.add(pm); |
224 | 2046 | pm = new PendingMessage(); |
225 | } | |
226 | 2 | } |
227 | ||
228 | /** | |
229 | * Returns true if the queue is empty. e.g., the next take position is the | |
230 | * read before position. | |
231 | * | |
232 | * @return If the queue is empty. | |
233 | */ | |
234 | public boolean isEmpty() { | |
235 | 10467 | final int take = myTakePosition; |
236 | 10467 | final int readyBefore = myReadyBeforePosition.get(); |
237 | ||
238 | 10467 | return (readyBefore == take) || (take < 0); |
239 | } | |
240 | ||
241 | /** | |
242 | * Puts a message onto the queue. This method will not block waiting for a | |
243 | * space to add the message. | |
244 | * | |
245 | * @param message | |
246 | * The message to add. | |
247 | * @param replyCallback | |
248 | * The callback for the message to add. | |
249 | * @return True if the message was added, false otherwise. | |
250 | */ | |
251 | public boolean offer(final Message message, | |
252 | final ReplyCallback replyCallback) { | |
253 | ||
254 | 0 | final int loop = myLooped.get(); |
255 | 0 | final int reserve = offer(); |
256 | 0 | if (reserve < 0) { |
257 | 0 | return false; |
258 | } | |
259 | ||
260 | 0 | final int messageid = toMessageId(loop, reserve); |
261 | ||
262 | 0 | myQueue[reserve].set(messageid, message, replyCallback); |
263 | ||
264 | 0 | markReady(reserve); |
265 | ||
266 | 0 | return true; |
267 | } | |
268 | ||
269 | /** | |
270 | * Puts a message onto the queue. This method will not block waiting for a | |
271 | * space to add the message. | |
272 | * | |
273 | * @param pendingMessage | |
274 | * The message to add. | |
275 | * @return True if the message was added, false otherwise. | |
276 | */ | |
277 | public boolean offer(final PendingMessage pendingMessage) { | |
278 | 8464 | final int reserve = offer(); |
279 | 8464 | if (reserve < 0) { |
280 | 2 | return false; |
281 | } | |
282 | ||
283 | 8462 | myQueue[reserve].set(pendingMessage); |
284 | ||
285 | 8462 | markReady(reserve); |
286 | ||
287 | 8462 | return true; |
288 | } | |
289 | ||
290 | /** | |
291 | * Returns the next message from the queue without blocking. <blockquote> | |
292 | * | |
293 | * <pre> | |
294 | * <code> | |
295 | * PendingMessage pm = new PendingMessage(); | |
296 | * if( queue.poll(pm) } { | |
297 | * // Handle the message copied into pm. | |
298 | * } | |
299 | * </code> | |
300 | * </pre> | |
301 | * | |
302 | * </blockquote> | |
303 | * | |
304 | * @param copyOut | |
305 | * The {@link PendingMessage} to copy the pending message into. | |
306 | * @return True if the pending message was updated. | |
307 | */ | |
308 | public boolean poll(final PendingMessage copyOut) { | |
309 | 344965415 | boolean result = false; |
310 | 344965415 | final int take = myTakePosition; |
311 | 344965415 | if ((myReadyBeforePosition.get() != take) && (take >= 0)) { // Empty, |
312 | // Not | |
313 | // started? | |
314 | 484240 | copyOut.set(myQueue[take]); |
315 | 484240 | myQueue[take].clear(); |
316 | 484240 | result = true; |
317 | ||
318 | 484240 | myTakePosition = increment(take); |
319 | 484240 | notifyWaiters(false); |
320 | } | |
321 | ||
322 | 344965415 | return result; |
323 | } | |
324 | ||
325 | /** | |
326 | * Puts a message onto the queue. This method will block waiting for a space | |
327 | * to add the message. | |
328 | * | |
329 | * @param message | |
330 | * The message to add. | |
331 | * @param replyCallback | |
332 | * The callback for the message to add. | |
333 | * | |
334 | * @throws InterruptedException | |
335 | * If the thread is interrupted while waiting for the message. | |
336 | * If thrown the message will not have been enqueued. | |
337 | */ | |
338 | public void put(final Message message, final ReplyCallback replyCallback) | |
339 | throws InterruptedException { | |
340 | ||
341 | 24586 | int loop = myLooped.get(); |
342 | 24586 | int reserve = offer(); |
343 | 24586 | if (reserve < 0) { |
344 | ||
345 | // Spinning here appears to slow things down. | |
346 | ||
347 | // Block. | |
348 | try { | |
349 | 2 | myWaiting.incrementAndGet(); |
350 | 2 | myLock.lock(); |
351 | ||
352 | 2 | loop = myLooped.get(); |
353 | 2 | reserve = offer(); |
354 | 2 | while (reserve < 0) { |
355 | 2 | myCondition.await(); |
356 | 0 | loop = myLooped.get(); |
357 | 0 | reserve = offer(); |
358 | } | |
359 | } | |
360 | finally { | |
361 | 2 | myLock.unlock(); |
362 | 2 | myWaiting.decrementAndGet(); |
363 | 0 | } |
364 | } | |
365 | ||
366 | 24584 | final int messageid = toMessageId(loop, reserve); |
367 | ||
368 | 24584 | myQueue[reserve].set(messageid, message, replyCallback); |
369 | ||
370 | 24584 | markReady(reserve); |
371 | 24584 | } |
372 | ||
373 | /** | |
374 | * Puts two messages onto the queue. This method will block waiting for a | |
375 | * space to add the messages but ensures the messages are in sequence in the | |
376 | * queue. | |
377 | * | |
378 | * @param message | |
379 | * The first message to add. | |
380 | * @param replyCallback | |
381 | * The callback for the first message to add. | |
382 | * @param message2 | |
383 | * The second message to add. | |
384 | * @param replyCallback2 | |
385 | * The callback for the second message to add. | |
386 | * | |
387 | * @throws InterruptedException | |
388 | * If the thread is interrupted while waiting for the message. | |
389 | * If thrown neither message will have been enqueued. | |
390 | */ | |
391 | public void put(final Message message, final ReplyCallback replyCallback, | |
392 | final Message message2, final ReplyCallback replyCallback2) | |
393 | throws InterruptedException { | |
394 | 3073 | int loop = myLooped.get(); |
395 | 3073 | int reserve = offer2(); |
396 | 3073 | if (reserve < 0) { |
397 | ||
398 | // Spinning here appears to slow things down. | |
399 | try { | |
400 | 2 | myWaiting.incrementAndGet(); |
401 | 2 | myLock.lock(); |
402 | ||
403 | 2 | loop = myLooped.get(); |
404 | 2 | reserve = offer2(); |
405 | 2 | while (reserve < 0) { |
406 | 2 | myCondition.await(); |
407 | 0 | loop = myLooped.get(); |
408 | 0 | reserve = offer2(); |
409 | } | |
410 | } | |
411 | finally { | |
412 | 2 | myLock.unlock(); |
413 | 2 | myWaiting.decrementAndGet(); |
414 | 0 | } |
415 | } | |
416 | ||
417 | // Use reserve + 1 for the second message id since it may have looped | |
418 | // and then the math does not work out causing messageId2 to be lower | |
419 | // than | |
420 | // messageId1, which is bad. | |
421 | 3071 | final int messageId1 = toMessageId(loop, reserve); |
422 | 3071 | final int messageId2 = toMessageId(loop, reserve + 1); |
423 | ||
424 | 3071 | final int second = increment(reserve); |
425 | 3071 | myQueue[reserve].set(messageId1, message, replyCallback); |
426 | 3071 | myQueue[second].set(messageId2, message2, replyCallback2); |
427 | ||
428 | 3071 | markReady2(reserve); |
429 | 3071 | } |
430 | ||
431 | /** | |
432 | * Puts a message onto the queue. This method will block waiting for a space | |
433 | * to add the message. | |
434 | * | |
435 | * @param pendingMessage | |
436 | * The message to add. | |
437 | * | |
438 | * @throws InterruptedException | |
439 | * If the thread is interrupted while waiting for the message. | |
440 | * If thrown the message will not have been enqueued. | |
441 | */ | |
442 | public void put(final PendingMessage pendingMessage) | |
443 | throws InterruptedException { | |
444 | 452738 | int reserve = offer(); |
445 | 453154 | if (reserve < 0) { |
446 | ||
447 | // Spinning here appears to slow things down. | |
448 | ||
449 | try { | |
450 | 16162 | myWaiting.incrementAndGet(); |
451 | 16170 | myLock.lock(); |
452 | ||
453 | 16178 | reserve = offer(); |
454 | 199767 | while (reserve < 0) { |
455 | 183591 | myCondition.await(); |
456 | 183589 | reserve = offer(); |
457 | } | |
458 | } | |
459 | finally { | |
460 | 16178 | myLock.unlock(); |
461 | 16178 | myWaiting.decrementAndGet(); |
462 | 16176 | } |
463 | } | |
464 | ||
465 | 453234 | myQueue[reserve].set(pendingMessage); |
466 | ||
467 | 453222 | markReady(reserve); |
468 | 453188 | } |
469 | ||
470 | /** | |
471 | * Returns the number of messages in the queue. | |
472 | * | |
473 | * @return The number of messages in the queue. | |
474 | */ | |
475 | public int size() { | |
476 | 11267 | final int take = myTakePosition; |
477 | 11267 | final int ready = myReadyBeforePosition.get(); |
478 | ||
479 | 11267 | if (take < 0) { |
480 | 3 | return 0; |
481 | } | |
482 | 11264 | else if (take <= ready) { |
483 | 11256 | return (ready - take); |
484 | } | |
485 | ||
486 | 8 | return (myQueue.length - take) + ready; |
487 | } | |
488 | ||
489 | /** | |
490 | * Returns the next message from the queue and will block waiting for a | |
491 | * message. | |
492 | * | |
493 | * @param copyOut | |
494 | * The {@link PendingMessage} to copy the pending message into. | |
495 | * @throws InterruptedException | |
496 | * If the thread is interrupted while waiting for the message. | |
497 | */ | |
498 | public void take(final PendingMessage copyOut) throws InterruptedException { | |
499 | 477875 | if (!poll(copyOut)) { |
500 | ||
501 | // Spin/yeild loop. | |
502 | 87099 | if (myLockType == LockType.LOW_LATENCY_SPIN) { |
503 | 59623 | long now = 0; |
504 | 59623 | long spinDeadline = 1; |
505 | 59623 | long yeildDeadline = 1; |
506 | 86217 | while (now < yeildDeadline) { |
507 | 344454520 | for (int i = 0; i < SPIN_ITERATIONS; ++i) { |
508 | 344427926 | if (poll(copyOut)) { |
509 | 58587 | return; |
510 | } | |
511 | } | |
512 | ||
513 | // Pause? | |
514 | 26594 | now = System.nanoTime(); |
515 | 26594 | if (spinDeadline == 1) { |
516 | 10009 | spinDeadline = now + SPIN_TIME_NS; |
517 | 10009 | yeildDeadline = now + YIELD_TIME_NS; |
518 | // First time free pass. | |
519 | } | |
520 | else { | |
521 | 16585 | if ((spinDeadline < now) && (now < yeildDeadline)) { |
522 | 15549 | Thread.yield(); |
523 | } | |
524 | } | |
525 | } | |
526 | } | |
527 | ||
528 | // Block. | |
529 | try { | |
530 | 28512 | myWaiting.incrementAndGet(); |
531 | 28512 | myLock.lock(); |
532 | ||
533 | 52998 | while (!poll(copyOut)) { |
534 | 24533 | myCondition.await(); |
535 | } | |
536 | } | |
537 | finally { | |
538 | 28512 | myLock.unlock(); |
539 | 28512 | myWaiting.decrementAndGet(); |
540 | 28465 | } |
541 | } | |
542 | 419241 | } |
543 | ||
544 | /** | |
545 | * Increments the index handling roll-over. | |
546 | * | |
547 | * @param index | |
548 | * The value to increment. | |
549 | * @return The incremented value. | |
550 | */ | |
551 | protected int increment(final int index) { | |
552 | 1571997 | return ((index + 1) & myMask); |
553 | } | |
554 | ||
555 | /** | |
556 | * Marks the position as ready by incrementing the ready position to the | |
557 | * provided position. This method uses a spin lock assuming any other | |
558 | * threads will increment the ready position quickly to the position just | |
559 | * before {@code index}. | |
560 | * | |
561 | * @param index | |
562 | * The index of the ready message. | |
563 | */ | |
564 | protected void markReady(final int index) { | |
565 | 486227 | final int after = increment(index); |
566 | ||
567 | 488498 | while (!myReadyBeforePosition.compareAndSet(index, after)) { |
568 | // Spinning here slows things down because we know that the other | |
569 | // thread should be runnable. Always Yield. | |
570 | 2470 | Thread.yield(); |
571 | } | |
572 | ||
573 | // Pull take position into the queue. | |
574 | 486231 | if ((index == 0) && (myTakePosition == -1)) { |
575 | 174 | myTakePosition = index; |
576 | } | |
577 | ||
578 | 486232 | notifyWaiters(false); |
579 | 486223 | } |
580 | ||
581 | /** | |
582 | * Marks the position and the next position as ready by incrementing the | |
583 | * ready position to the provided position + 1. This method uses a spin lock | |
584 | * assuming any other threads will increment the ready position quickly to | |
585 | * the position just before {@code index}. | |
586 | * | |
587 | * @param index | |
588 | * The index of the ready message. | |
589 | */ | |
590 | protected void markReady2(final int index) { | |
591 | 3071 | final int after = increment(index); |
592 | 3071 | final int twoAfter = increment(after); |
593 | ||
594 | 3071 | while (!myReadyBeforePosition.compareAndSet(index, twoAfter)) { |
595 | // Just keep swimming... | |
596 | 0 | Thread.yield(); |
597 | } | |
598 | ||
599 | // Pull take position into the queue. | |
600 | 3071 | if ((index == 0) && (myTakePosition == -1)) { |
601 | 3 | myTakePosition = index; |
602 | } | |
603 | ||
604 | // If someone is waiting let them know we created two messages. | |
605 | 3071 | notifyWaiters(true); |
606 | 3071 | } |
607 | ||
608 | /** | |
609 | * Notifies the waiting threads that the state of the queue has changed. | |
610 | * | |
611 | * @param all | |
612 | * If true then all threads will be woken. Otherwise only a | |
613 | * single thread is woken. | |
614 | */ | |
615 | protected void notifyWaiters(final boolean all) { | |
616 | 966117 | if (myWaiting.get() > 0) { |
617 | try { | |
618 | 523124 | myLock.lock(); |
619 | 523382 | if (all) { |
620 | 1 | myCondition.signalAll(); |
621 | } | |
622 | else { | |
623 | 523381 | myCondition.signal(); |
624 | } | |
625 | } | |
626 | finally { | |
627 | 523382 | myLock.unlock(); |
628 | 523283 | } |
629 | } | |
630 | 967782 | } |
631 | ||
632 | /** | |
633 | * Checks if there is remove for another message. If so returns the index of | |
634 | * the message to update. If not return a value less then zero. | |
635 | * | |
636 | * @return The position of the message that can be updated or a value of | |
637 | * less than zero if the queue is full. | |
638 | */ | |
639 | protected int offer() { | |
640 | 684936 | int result = -1; |
641 | 684997 | final int reserve = myReservePosition.get(); |
642 | 685368 | final int next = increment(reserve); |
643 | 685428 | if ((myTakePosition != next) /* Full? */ |
644 | && myReservePosition.compareAndSet(reserve, next)) { | |
645 | ||
646 | // Got a slot. | |
647 | 486280 | result = reserve; |
648 | ||
649 | // Check if we looped. | |
650 | 486282 | if (next < reserve) { |
651 | 1754 | myLooped.incrementAndGet(); |
652 | } | |
653 | } | |
654 | 685680 | return result; |
655 | } | |
656 | ||
657 | /** | |
658 | * Checks if there is remove for another two message. If so returns the | |
659 | * index of the first message to update. If not return a value less then | |
660 | * zero. | |
661 | * | |
662 | * @return The position of the first message that can be updated or a value | |
663 | * of less than zero if the queue is full. | |
664 | */ | |
665 | protected int offer2() { | |
666 | 3075 | int result = -1; |
667 | 3075 | final int reserve = myReservePosition.get(); |
668 | 3075 | final int first = increment(reserve); |
669 | 3075 | final int second = increment(first); |
670 | 3075 | final int take = myTakePosition; |
671 | 3075 | if ((take != first) && (take != second) /* Full? */ |
672 | && myReservePosition.compareAndSet(reserve, second)) { | |
673 | ||
674 | // Got two slots. Return the first. | |
675 | 3071 | result = reserve; |
676 | ||
677 | // Check if we looped. | |
678 | 3071 | if (second < reserve) { |
679 | 4 | myLooped.incrementAndGet(); |
680 | } | |
681 | } | |
682 | 3075 | return result; |
683 | } | |
684 | ||
685 | /** | |
686 | * Computes a new message id based on the current loop and reserve spot in | |
687 | * the queue. | |
688 | * | |
689 | * @param loop | |
690 | * The number of time the queue has looped over the queue. | |
691 | * @param reserve | |
692 | * The reserved position in the queue. This can be a virtual | |
693 | * postion. | |
694 | * @return The message id to use. | |
695 | */ | |
696 | private int toMessageId(final int loop, final long reserve) { | |
697 | 30726 | final long loopOffset = (((long) loop) * myQueue.length); |
698 | 30726 | if (loopOffset > MAX_MESSAGE_ID_MASK) { |
699 | 0 | myLooped.compareAndSet(loop, 0); |
700 | } | |
701 | // Add an extra 1 so the first value is 1 instead of zero. | |
702 | 30726 | return (int) ((loopOffset + reserve) & MAX_MESSAGE_ID_MASK) + 1; |
703 | } | |
704 | } |