Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
FutureCallback |
|
| 2.8;2.8 | ||||
FutureCallback$PendingListener |
|
| 2.8;2.8 | ||||
FutureCallback$Sync |
|
| 2.8;2.8 |
1 | /* | |
2 | * #%L | |
3 | * FutureCallback.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | package com.allanbank.mongodb.client; | |
21 | ||
22 | import java.util.concurrent.CancellationException; | |
23 | import java.util.concurrent.ExecutionException; | |
24 | import java.util.concurrent.Executor; | |
25 | import java.util.concurrent.Future; | |
26 | import java.util.concurrent.TimeUnit; | |
27 | import java.util.concurrent.TimeoutException; | |
28 | import java.util.concurrent.atomic.AtomicReference; | |
29 | import java.util.concurrent.locks.AbstractQueuedSynchronizer; | |
30 | ||
31 | import com.allanbank.mongodb.Callback; | |
32 | import com.allanbank.mongodb.ListenableFuture; | |
33 | import com.allanbank.mongodb.LockType; | |
34 | import com.allanbank.mongodb.client.callback.ReplyHandler; | |
35 | import com.allanbank.mongodb.util.Assertions; | |
36 | import com.allanbank.mongodb.util.log.Log; | |
37 | import com.allanbank.mongodb.util.log.LogFactory; | |
38 | ||
39 | /** | |
40 | * Implementation of a {@link Callback} and {@link ListenableFuture} interfaces. | |
41 | * Used to convert a {@link Callback} into a {@link ListenableFuture} for | |
42 | * returning to callers. | |
43 | * | |
44 | * @param <V> | |
45 | * The type for the set value. | |
46 | * | |
47 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may be | |
48 | * mutated in incompatible ways between any two releases of the driver. | |
49 | * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved | |
50 | */ | |
51 | public class FutureCallback<V> implements ListenableFuture<V>, Callback<V> { | |
52 | ||
53 | /** Logger to log exceptions caught when running myPendingListeners. */ | |
54 | 1 | public static final Log LOG = LogFactory.getLog(FutureCallback.class); |
55 | ||
56 | /** Amount of time to spin before yielding. Set to 1/100 of a millisecond. */ | |
57 | 1 | public static final long SPIN_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) / 100; |
58 | ||
59 | /** Number of times to spin before trying something different. */ | |
60 | private static final int SPIN_ITERATIONS = 10000; | |
61 | ||
62 | /** Amount of time to spin/yield before waiting. Set to 1/2 millisecond. */ | |
63 | 1 | private static final long YIELD_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) >> 1; |
64 | ||
65 | /** The type of lock to use when waiting for the future to be fulfilled. */ | |
66 | private final LockType myLockType; | |
67 | ||
68 | /** The runnable, executor pairs to execute within a singly linked list. */ | |
69 | private AtomicReference<PendingListener> myPendingListeners; | |
70 | ||
71 | /** Synchronization control for this Future. */ | |
72 | private final Sync<V> mySync; | |
73 | ||
74 | /** | |
75 | * Create a new FutureCallback. | |
76 | */ | |
77 | public FutureCallback() { | |
78 | 1001011 | this(LockType.MUTEX); |
79 | 1001011 | } |
80 | ||
81 | /** | |
82 | * Create a new FutureCallback. | |
83 | * | |
84 | * @param lockType | |
85 | * The type of lock to use when waiting for the future to be | |
86 | * fulfilled. | |
87 | */ | |
88 | 1001196 | public FutureCallback(final LockType lockType) { |
89 | 1001196 | mySync = new Sync<V>(); |
90 | 1001196 | myLockType = lockType; |
91 | 1001196 | myPendingListeners = new AtomicReference<PendingListener>(null); |
92 | 1001196 | } |
93 | ||
94 | /** | |
95 | * {@inheritDoc} | |
96 | */ | |
97 | @Override | |
98 | public void addListener(final Runnable runnable, final Executor executor) { | |
99 | 102 | Assertions.assertNotNull(runnable, "Runnable is null."); |
100 | 101 | Assertions.assertNotNull(executor, "Executor is null."); |
101 | ||
102 | 102 | if (!isDone()) { |
103 | 27 | PendingListener existing = myPendingListeners.get(); |
104 | 27 | PendingListener listener = new PendingListener(runnable, executor, |
105 | existing); | |
106 | ||
107 | 37 | while (!myPendingListeners.compareAndSet(existing, listener)) { |
108 | 9 | existing = myPendingListeners.get(); |
109 | 9 | listener = new PendingListener(runnable, executor, existing); |
110 | } | |
111 | ||
112 | 28 | if (isDone()) { |
113 | 0 | execute(); |
114 | } | |
115 | 28 | } |
116 | else { | |
117 | // Run the executor. | |
118 | 75 | execute(executor, runnable); |
119 | } | |
120 | 99 | } |
121 | ||
122 | /** | |
123 | * {@inheritDoc} | |
124 | * <p> | |
125 | * Sets the value for the future and triggers any pending {@link #get} to | |
126 | * return. | |
127 | * </p> | |
128 | * | |
129 | * @see Callback#callback | |
130 | */ | |
131 | @Override | |
132 | public void callback(final V result) { | |
133 | 1001035 | final boolean set = mySync.set(result); |
134 | 1001035 | if (set) { |
135 | 1001034 | execute(); |
136 | } | |
137 | 1001035 | } |
138 | ||
139 | /** | |
140 | * {@inheritDoc} | |
141 | * <p> | |
142 | * If not cancelled and the callback has not completed then cancels the | |
143 | * future, triggers the return of any pending {@link #get()} and returns | |
144 | * true. Otherwise returns false. This does not stop the related MongoDB | |
145 | * invocation. | |
146 | * </p> | |
147 | * | |
148 | * @see Future#cancel(boolean) | |
149 | */ | |
150 | @Override | |
151 | public boolean cancel(final boolean mayInterruptIfRunning) { | |
152 | 103 | if (!mySync.cancel(mayInterruptIfRunning)) { |
153 | 101 | return false; |
154 | } | |
155 | 4 | execute(); |
156 | ||
157 | 4 | return true; |
158 | } | |
159 | ||
160 | /** | |
161 | * {@inheritDoc} | |
162 | * <p> | |
163 | * Sets the exception for the future and triggers any pending {@link #get} | |
164 | * to throw a {@link ExecutionException}. | |
165 | * </p> | |
166 | * | |
167 | * @see Callback#exception | |
168 | */ | |
169 | @Override | |
170 | public void exception(final Throwable thrown) { | |
171 | 63 | Assertions.assertNotNull(thrown, "Cannot set a null exception."); |
172 | ||
173 | 63 | final boolean set = mySync.setException(thrown); |
174 | 63 | if (set) { |
175 | 61 | execute(); |
176 | } | |
177 | 63 | } |
178 | ||
179 | /** | |
180 | * {@inheritDoc} | |
181 | * <p> | |
182 | * Returns the value set via the {@link Callback}. | |
183 | * </p> | |
184 | * | |
185 | * @see Future#get() | |
186 | */ | |
187 | @Override | |
188 | public V get() throws InterruptedException, ExecutionException { | |
189 | ||
190 | 1000846 | if (myLockType == LockType.LOW_LATENCY_SPIN) { |
191 | 2 | long now = 0; |
192 | 2 | long spinDeadline = 1; |
193 | 2 | long yeildDeadline = 1; |
194 | 7 | while ((now < yeildDeadline) && !isDone()) { |
195 | 5 | for (int i = 0; (i < SPIN_ITERATIONS) && !isDone(); ++i) { |
196 | // Hard spin... | |
197 | } | |
198 | ||
199 | 5 | if (!isDone()) { |
200 | // Pause? | |
201 | 4 | now = System.nanoTime(); |
202 | 4 | if (spinDeadline == 1) { |
203 | 2 | spinDeadline = now + SPIN_TIME_NS; |
204 | 2 | yeildDeadline = now + YIELD_TIME_NS; |
205 | // First time yield to allow other threads to do their | |
206 | // work... | |
207 | 2 | Thread.yield(); |
208 | } | |
209 | else { | |
210 | 2 | if ((spinDeadline < now) && (now < yeildDeadline)) { |
211 | 1 | Thread.yield(); |
212 | } | |
213 | } | |
214 | } | |
215 | } | |
216 | } | |
217 | ||
218 | 1000846 | final long shortPause = TimeUnit.MILLISECONDS.toNanos(10); |
219 | while (true) { | |
220 | try { | |
221 | // Either the value is available and the get() will not block | |
222 | // or we have spun for long enough and it is time to block. | |
223 | 1000855 | return mySync.get(shortPause); |
224 | } | |
225 | 9 | catch (final TimeoutException te) { |
226 | 9 | ReplyHandler.tryReceive(); |
227 | 9 | } |
228 | } | |
229 | } | |
230 | ||
231 | /** | |
232 | * {@inheritDoc} | |
233 | */ | |
234 | @Override | |
235 | public V get(final long timeout, final TimeUnit unit) | |
236 | throws InterruptedException, TimeoutException, ExecutionException { | |
237 | 116 | long now = System.nanoTime(); |
238 | 116 | final long deadline = now + unit.toNanos(timeout); |
239 | 116 | final long shortPause = TimeUnit.MILLISECONDS.toNanos(10); |
240 | while (true) { | |
241 | try { | |
242 | // Wait for the result. | |
243 | 266 | return mySync.get(Math.min((deadline - now), shortPause)); |
244 | } | |
245 | 153 | catch (final TimeoutException te) { |
246 | // Check if we should receive. | |
247 | 153 | now = System.nanoTime(); |
248 | 153 | if (now < deadline) { |
249 | 150 | ReplyHandler.tryReceive(); |
250 | } | |
251 | else { | |
252 | 3 | throw te; |
253 | } | |
254 | 150 | } |
255 | } | |
256 | } | |
257 | ||
258 | /** | |
259 | * {@inheritDoc} | |
260 | * <p> | |
261 | * Returns true if {@link #cancel(boolean)} has been called. | |
262 | * </p> | |
263 | * | |
264 | * @see Future#isCancelled() | |
265 | */ | |
266 | @Override | |
267 | public boolean isCancelled() { | |
268 | 5 | return mySync.isCancelled(); |
269 | } | |
270 | ||
271 | /** | |
272 | * {@inheritDoc} | |
273 | * <p> | |
274 | * True if a value has been set via the the {@link Callback} interface or | |
275 | * the {@link Future} has been {@link #cancel(boolean) cancelled}. | |
276 | * </p> | |
277 | * | |
278 | * @see Future#isDone() | |
279 | */ | |
280 | @Override | |
281 | public boolean isDone() { | |
282 | 49432 | return mySync.isDone(); |
283 | } | |
284 | ||
285 | /** | |
286 | * Runs this execution list, executing all existing pairs. | |
287 | * <p> | |
288 | * All callers of this method will drain the list of listeners. | |
289 | * </p> | |
290 | */ | |
291 | protected void execute() { | |
292 | PendingListener toRun; | |
293 | PendingListener next; | |
294 | ||
295 | // Keep running until the list is exhausted. | |
296 | do { | |
297 | ||
298 | // Pick the next item from the list. | |
299 | do { | |
300 | 1001127 | toRun = myPendingListeners.get(); |
301 | 1001127 | next = (toRun != null) ? toRun.myNext : null; |
302 | } | |
303 | 1001127 | while (!myPendingListeners.compareAndSet(toRun, next)); |
304 | ||
305 | // Run this item - if it exists. | |
306 | 1001127 | if (toRun != null) { |
307 | 28 | execute(toRun.myExecutor, toRun.myRunnable); |
308 | } | |
309 | } | |
310 | 1001127 | while (toRun != null); |
311 | 1001099 | } |
312 | ||
313 | /** | |
314 | * Execute the {@link Runnable} with the {@link Executor} suppressing | |
315 | * exceptions. | |
316 | * | |
317 | * @param executor | |
318 | * The executor to use. | |
319 | * @param runnable | |
320 | * The {@link Runnable} to execute. | |
321 | */ | |
322 | private void execute(final Executor executor, final Runnable runnable) { | |
323 | try { | |
324 | 103 | executor.execute(runnable); |
325 | } | |
326 | 1 | catch (final RuntimeException e) { |
327 | 1 | LOG.error(e, "Exception running a FutureListener's runnable {} " |
328 | + "with executor {}", runnable, executor); | |
329 | 96 | } |
330 | 99 | } |
331 | ||
332 | /** | |
333 | * PendingListener an immutable element in the list of listeners. | |
334 | * | |
335 | * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved | |
336 | */ | |
337 | /* package */static final class PendingListener { | |
338 | ||
339 | /** The executor to use to run the {@link Runnable}. */ | |
340 | /* package */final Executor myExecutor; | |
341 | ||
342 | /** The next item to execute. */ | |
343 | /* package */final PendingListener myNext; | |
344 | ||
345 | /** The listener's {@link Runnable}. */ | |
346 | /* package */final Runnable myRunnable; | |
347 | ||
348 | /** | |
349 | * Creates a new PendingListener. | |
350 | * | |
351 | * @param runnable | |
352 | * The listener's {@link Runnable}. | |
353 | * @param executor | |
354 | * The executor to use to run the {@link Runnable}. | |
355 | * @param next | |
356 | * The next item to execute. | |
357 | */ | |
358 | /* package */PendingListener(final Runnable runnable, | |
359 | 37 | final Executor executor, final PendingListener next) { |
360 | 37 | myRunnable = runnable; |
361 | 37 | myExecutor = executor; |
362 | 37 | myNext = next; |
363 | 37 | } |
364 | } | |
365 | ||
366 | /** | |
367 | * Tracks the state of the Future via the {@link AbstractQueuedSynchronizer} | |
368 | * model. The state starts in the {@link #RUNNING} state. The first thread | |
369 | * to complete the future moves the state to the {@link #COMPLETING} state, | |
370 | * sets the value and then sets the appropriate final state. | |
371 | * | |
372 | * @param <V> | |
373 | * The type of value for the future. | |
374 | */ | |
375 | /* package */static final class Sync<V> extends AbstractQueuedSynchronizer { | |
376 | ||
377 | /** State to represent the future was canceled. */ | |
378 | /* package */static final int CANCELED = 4; | |
379 | ||
380 | /** State to represent the value has been set. */ | |
381 | /* package */static final int COMPLETED = 2; | |
382 | ||
383 | /** State set while the value is being set. */ | |
384 | /* package */static final int COMPLETING = 1; | |
385 | ||
386 | /** State to represent the future was interrupted. */ | |
387 | /* package */static final int INTERRUPTED = 8; | |
388 | ||
389 | /** The initial running state. */ | |
390 | /* package */static final int RUNNING = 0; | |
391 | ||
392 | /** The unused value passed to {@link #acquire(int)} methods. */ | |
393 | /* package */static final int UNUSED = -1; | |
394 | ||
395 | /** Serialization version of the class. */ | |
396 | private static final long serialVersionUID = -9189950787072982459L; | |
397 | ||
398 | /** The exception for the future. */ | |
399 | private Throwable myException; | |
400 | ||
401 | /** The value set in the future. */ | |
402 | private V myValue; | |
403 | ||
404 | /** | |
405 | * Creates a new Sync. | |
406 | */ | |
407 | 1001196 | /* package */Sync() { |
408 | 1001196 | myValue = null; |
409 | 1001196 | myException = null; |
410 | 1001196 | } |
411 | ||
412 | /** | |
413 | * Acquisition succeeds if we are done, otherwise fail. | |
414 | */ | |
415 | @Override | |
416 | protected int tryAcquireShared(final int ignored) { | |
417 | 1001695 | if (isDone()) { |
418 | 1000948 | return 1; |
419 | } | |
420 | 747 | return -1; |
421 | } | |
422 | ||
423 | /** | |
424 | * We always allow a release to finish. We define it to represent that a | |
425 | * state transition completed. | |
426 | */ | |
427 | @Override | |
428 | protected boolean tryReleaseShared(final int finalState) { | |
429 | 1001099 | setState(finalState); |
430 | 1001099 | return true; |
431 | } | |
432 | ||
433 | /** | |
434 | * Move to the CANCELED or INTERRUPTED state. | |
435 | * | |
436 | * @param interrupt | |
437 | * If we are interrupted. | |
438 | * @return If the cancel worked / won. | |
439 | */ | |
440 | /* package */boolean cancel(final boolean interrupt) { | |
441 | 105 | return complete(null, null, interrupt ? INTERRUPTED : CANCELED); |
442 | } | |
443 | ||
444 | /** | |
445 | * Blocks until the future {@link #complete(Object, Throwable, int) | |
446 | * completes}. | |
447 | * | |
448 | * @return The value set for the future. | |
449 | * @throws CancellationException | |
450 | * If the future was canceled. | |
451 | * @throws ExecutionException | |
452 | * If the future failed due to an exception. | |
453 | * @throws InterruptedException | |
454 | * If these call is interrupted. | |
455 | */ | |
456 | /* package */V get() throws CancellationException, ExecutionException, | |
457 | InterruptedException { | |
458 | ||
459 | // Acquire the shared lock allowing interruption. | |
460 | 0 | acquireSharedInterruptibly(UNUSED); |
461 | ||
462 | 0 | return getValue(); |
463 | } | |
464 | ||
465 | /** | |
466 | * Blocks until the future {@link #complete(Object, Throwable, int) | |
467 | * completes} or the number of nano-seconds expires. | |
468 | * | |
469 | * @param nanos | |
470 | * The number of nano-seconds to wait. | |
471 | * @return The value set for the future. | |
472 | * @throws TimeoutException | |
473 | * If this call time expires. | |
474 | * @throws CancellationException | |
475 | * If the future was canceled. | |
476 | * @throws ExecutionException | |
477 | * If the future failed due to an exception. | |
478 | * @throws InterruptedException | |
479 | * If these call is interrupted. | |
480 | */ | |
481 | /* package */V get(final long nanos) throws TimeoutException, | |
482 | CancellationException, ExecutionException, InterruptedException { | |
483 | ||
484 | // Attempt to acquire the shared lock with a timeout. | |
485 | 1001121 | if (!tryAcquireSharedNanos(UNUSED, nanos)) { |
486 | 162 | throw new TimeoutException("Timeout waiting for task."); |
487 | } | |
488 | ||
489 | 1000948 | return getValue(); |
490 | } | |
491 | ||
492 | /** | |
493 | * Checks if the state is {@link #CANCELED} or {@link #INTERRUPTED}. | |
494 | * | |
495 | * @return True if the future state is {@link #CANCELED} or | |
496 | * {@link #INTERRUPTED}. | |
497 | */ | |
498 | /* package */boolean isCancelled() { | |
499 | 5 | return (getState() & (CANCELED | INTERRUPTED)) != 0; |
500 | } | |
501 | ||
502 | /** | |
503 | * Checks if the state is {@link #COMPLETED}, {@link #CANCELED} or | |
504 | * {@link #INTERRUPTED}. | |
505 | * | |
506 | * @return True if the future state is {@link #COMPLETED}, | |
507 | * {@link #CANCELED} or {@link #INTERRUPTED}. | |
508 | */ | |
509 | /* package */boolean isDone() { | |
510 | 1051128 | return (getState() & (COMPLETED | CANCELED | INTERRUPTED)) != 0; |
511 | } | |
512 | ||
513 | /** | |
514 | * Move to the COMPLETED state and set the value. | |
515 | * | |
516 | * @param value | |
517 | * The value to set. | |
518 | * @return If the set worked / won. | |
519 | */ | |
520 | /* package */boolean set(final V value) { | |
521 | 1001035 | return complete(value, null, COMPLETED); |
522 | } | |
523 | ||
524 | /** | |
525 | * Move to the COMPLETED state and set the exception value. | |
526 | * | |
527 | * @param thrown | |
528 | * The exception to set. | |
529 | * @return If the set worked / won. | |
530 | */ | |
531 | /* package */boolean setException(final Throwable thrown) { | |
532 | 63 | return complete(null, thrown, COMPLETED); |
533 | } | |
534 | ||
535 | /** | |
536 | * Completes the future. | |
537 | * | |
538 | * @param value | |
539 | * The value to set as the result of the future. | |
540 | * @param thrown | |
541 | * the exception to set as the result of the future. | |
542 | * @param finalState | |
543 | * the state to transition to. | |
544 | * @return Returns true if the completion was successful / won. | |
545 | */ | |
546 | private boolean complete(final V value, final Throwable thrown, | |
547 | final int finalState) { | |
548 | ||
549 | // Move to COMPLETING to see if we are the first to complete. | |
550 | 1001203 | final boolean won = compareAndSetState(RUNNING, COMPLETING); |
551 | 1001203 | if (won) { |
552 | 1001099 | this.myValue = value; |
553 | 1001099 | this.myException = ((finalState & (CANCELED | INTERRUPTED)) != 0) ? new CancellationException( |
554 | "Future was canceled.") : thrown; | |
555 | ||
556 | // Release all of the waiting threads. | |
557 | 1001099 | releaseShared(finalState); |
558 | } | |
559 | 104 | else if (getState() == COMPLETING) { |
560 | // Block until done. | |
561 | 0 | acquireShared(UNUSED); |
562 | } | |
563 | ||
564 | 1001202 | return won; |
565 | } | |
566 | ||
567 | /** | |
568 | * Implementation to get the future's value. | |
569 | * | |
570 | * @return The value set for the future. | |
571 | * @throws CancellationException | |
572 | * If the future was canceled. | |
573 | * @throws ExecutionException | |
574 | * If the future failed due to an exception. | |
575 | */ | |
576 | private V getValue() throws CancellationException, ExecutionException { | |
577 | 1000948 | final int state = getState(); |
578 | 1000948 | switch (state) { |
579 | case COMPLETED: | |
580 | 1000944 | if (myException != null) { |
581 | 35 | throw new ExecutionException(myException); |
582 | } | |
583 | 1000909 | return myValue; |
584 | ||
585 | case CANCELED: | |
586 | case INTERRUPTED: | |
587 | 4 | final CancellationException cancellation = new CancellationException( |
588 | "Future was canceled."); | |
589 | 4 | cancellation.initCause(myException); |
590 | ||
591 | 4 | throw cancellation; |
592 | ||
593 | default: | |
594 | 0 | throw new IllegalStateException("Sync in invalid state: " |
595 | + state); | |
596 | } | |
597 | } | |
598 | } | |
599 | ||
600 | } |