1 /*
2 * #%L
3 * FutureCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20 package com.allanbank.mongodb.client;
21
22 import java.util.concurrent.CancellationException;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.Future;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicReference;
29 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
30
31 import com.allanbank.mongodb.Callback;
32 import com.allanbank.mongodb.ListenableFuture;
33 import com.allanbank.mongodb.LockType;
34 import com.allanbank.mongodb.client.callback.ReplyHandler;
35 import com.allanbank.mongodb.util.Assertions;
36 import com.allanbank.mongodb.util.log.Log;
37 import com.allanbank.mongodb.util.log.LogFactory;
38
39 /**
40 * Implementation of a {@link Callback} and {@link ListenableFuture} interfaces.
41 * Used to convert a {@link Callback} into a {@link ListenableFuture} for
42 * returning to callers.
43 *
44 * @param <V>
45 * The type for the set value.
46 *
47 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
48 * mutated in incompatible ways between any two releases of the driver.
49 * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
50 */
51 public class FutureCallback<V> implements ListenableFuture<V>, Callback<V> {
52
53 /** Logger to log exceptions caught when running myPendingListeners. */
54 public static final Log LOG = LogFactory.getLog(FutureCallback.class);
55
56 /** Amount of time to spin before yielding. Set to 1/100 of a millisecond. */
57 public static final long SPIN_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) / 100;
58
59 /** Number of times to spin before trying something different. */
60 private static final int SPIN_ITERATIONS = 10000;
61
62 /** Amount of time to spin/yield before waiting. Set to 1/2 millisecond. */
63 private static final long YIELD_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) >> 1;
64
65 /** The type of lock to use when waiting for the future to be fulfilled. */
66 private final LockType myLockType;
67
68 /** The runnable, executor pairs to execute within a singly linked list. */
69 private AtomicReference<PendingListener> myPendingListeners;
70
71 /** Synchronization control for this Future. */
72 private final Sync<V> mySync;
73
74 /**
75 * Create a new FutureCallback.
76 */
77 public FutureCallback() {
78 this(LockType.MUTEX);
79 }
80
81 /**
82 * Create a new FutureCallback.
83 *
84 * @param lockType
85 * The type of lock to use when waiting for the future to be
86 * fulfilled.
87 */
88 public FutureCallback(final LockType lockType) {
89 mySync = new Sync<V>();
90 myLockType = lockType;
91 myPendingListeners = new AtomicReference<PendingListener>(null);
92 }
93
94 /**
95 * {@inheritDoc}
96 */
97 @Override
98 public void addListener(final Runnable runnable, final Executor executor) {
99 Assertions.assertNotNull(runnable, "Runnable is null.");
100 Assertions.assertNotNull(executor, "Executor is null.");
101
102 if (!isDone()) {
103 PendingListener existing = myPendingListeners.get();
104 PendingListener listener = new PendingListener(runnable, executor,
105 existing);
106
107 while (!myPendingListeners.compareAndSet(existing, listener)) {
108 existing = myPendingListeners.get();
109 listener = new PendingListener(runnable, executor, existing);
110 }
111
112 if (isDone()) {
113 execute();
114 }
115 }
116 else {
117 // Run the executor.
118 execute(executor, runnable);
119 }
120 }
121
122 /**
123 * {@inheritDoc}
124 * <p>
125 * Sets the value for the future and triggers any pending {@link #get} to
126 * return.
127 * </p>
128 *
129 * @see Callback#callback
130 */
131 @Override
132 public void callback(final V result) {
133 final boolean set = mySync.set(result);
134 if (set) {
135 execute();
136 }
137 }
138
139 /**
140 * {@inheritDoc}
141 * <p>
142 * If not cancelled and the callback has not completed then cancels the
143 * future, triggers the return of any pending {@link #get()} and returns
144 * true. Otherwise returns false. This does not stop the related MongoDB
145 * invocation.
146 * </p>
147 *
148 * @see Future#cancel(boolean)
149 */
150 @Override
151 public boolean cancel(final boolean mayInterruptIfRunning) {
152 if (!mySync.cancel(mayInterruptIfRunning)) {
153 return false;
154 }
155 execute();
156
157 return true;
158 }
159
160 /**
161 * {@inheritDoc}
162 * <p>
163 * Sets the exception for the future and triggers any pending {@link #get}
164 * to throw a {@link ExecutionException}.
165 * </p>
166 *
167 * @see Callback#exception
168 */
169 @Override
170 public void exception(final Throwable thrown) {
171 Assertions.assertNotNull(thrown, "Cannot set a null exception.");
172
173 final boolean set = mySync.setException(thrown);
174 if (set) {
175 execute();
176 }
177 }
178
179 /**
180 * {@inheritDoc}
181 * <p>
182 * Returns the value set via the {@link Callback}.
183 * </p>
184 *
185 * @see Future#get()
186 */
187 @Override
188 public V get() throws InterruptedException, ExecutionException {
189
190 if (myLockType == LockType.LOW_LATENCY_SPIN) {
191 long now = 0;
192 long spinDeadline = 1;
193 long yeildDeadline = 1;
194 while ((now < yeildDeadline) && !isDone()) {
195 for (int i = 0; (i < SPIN_ITERATIONS) && !isDone(); ++i) {
196 // Hard spin...
197 }
198
199 if (!isDone()) {
200 // Pause?
201 now = System.nanoTime();
202 if (spinDeadline == 1) {
203 spinDeadline = now + SPIN_TIME_NS;
204 yeildDeadline = now + YIELD_TIME_NS;
205 // First time yield to allow other threads to do their
206 // work...
207 Thread.yield();
208 }
209 else {
210 if ((spinDeadline < now) && (now < yeildDeadline)) {
211 Thread.yield();
212 }
213 }
214 }
215 }
216 }
217
218 final long shortPause = TimeUnit.MILLISECONDS.toNanos(10);
219 while (true) {
220 try {
221 // Either the value is available and the get() will not block
222 // or we have spun for long enough and it is time to block.
223 return mySync.get(shortPause);
224 }
225 catch (final TimeoutException te) {
226 ReplyHandler.tryReceive();
227 }
228 }
229 }
230
231 /**
232 * {@inheritDoc}
233 */
234 @Override
235 public V get(final long timeout, final TimeUnit unit)
236 throws InterruptedException, TimeoutException, ExecutionException {
237 long now = System.nanoTime();
238 final long deadline = now + unit.toNanos(timeout);
239 final long shortPause = TimeUnit.MILLISECONDS.toNanos(10);
240 while (true) {
241 try {
242 // Wait for the result.
243 return mySync.get(Math.min((deadline - now), shortPause));
244 }
245 catch (final TimeoutException te) {
246 // Check if we should receive.
247 now = System.nanoTime();
248 if (now < deadline) {
249 ReplyHandler.tryReceive();
250 }
251 else {
252 throw te;
253 }
254 }
255 }
256 }
257
258 /**
259 * {@inheritDoc}
260 * <p>
261 * Returns true if {@link #cancel(boolean)} has been called.
262 * </p>
263 *
264 * @see Future#isCancelled()
265 */
266 @Override
267 public boolean isCancelled() {
268 return mySync.isCancelled();
269 }
270
271 /**
272 * {@inheritDoc}
273 * <p>
274 * True if a value has been set via the the {@link Callback} interface or
275 * the {@link Future} has been {@link #cancel(boolean) cancelled}.
276 * </p>
277 *
278 * @see Future#isDone()
279 */
280 @Override
281 public boolean isDone() {
282 return mySync.isDone();
283 }
284
285 /**
286 * Runs this execution list, executing all existing pairs.
287 * <p>
288 * All callers of this method will drain the list of listeners.
289 * </p>
290 */
291 protected void execute() {
292 PendingListener toRun;
293 PendingListener next;
294
295 // Keep running until the list is exhausted.
296 do {
297
298 // Pick the next item from the list.
299 do {
300 toRun = myPendingListeners.get();
301 next = (toRun != null) ? toRun.myNext : null;
302 }
303 while (!myPendingListeners.compareAndSet(toRun, next));
304
305 // Run this item - if it exists.
306 if (toRun != null) {
307 execute(toRun.myExecutor, toRun.myRunnable);
308 }
309 }
310 while (toRun != null);
311 }
312
313 /**
314 * Execute the {@link Runnable} with the {@link Executor} suppressing
315 * exceptions.
316 *
317 * @param executor
318 * The executor to use.
319 * @param runnable
320 * The {@link Runnable} to execute.
321 */
322 private void execute(final Executor executor, final Runnable runnable) {
323 try {
324 executor.execute(runnable);
325 }
326 catch (final RuntimeException e) {
327 LOG.error(e, "Exception running a FutureListener's runnable {} "
328 + "with executor {}", runnable, executor);
329 }
330 }
331
332 /**
333 * PendingListener an immutable element in the list of listeners.
334 *
335 * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
336 */
337 /* package */static final class PendingListener {
338
339 /** The executor to use to run the {@link Runnable}. */
340 /* package */final Executor myExecutor;
341
342 /** The next item to execute. */
343 /* package */final PendingListener myNext;
344
345 /** The listener's {@link Runnable}. */
346 /* package */final Runnable myRunnable;
347
348 /**
349 * Creates a new PendingListener.
350 *
351 * @param runnable
352 * The listener's {@link Runnable}.
353 * @param executor
354 * The executor to use to run the {@link Runnable}.
355 * @param next
356 * The next item to execute.
357 */
358 /* package */PendingListener(final Runnable runnable,
359 final Executor executor, final PendingListener next) {
360 myRunnable = runnable;
361 myExecutor = executor;
362 myNext = next;
363 }
364 }
365
366 /**
367 * Tracks the state of the Future via the {@link AbstractQueuedSynchronizer}
368 * model. The state starts in the {@link #RUNNING} state. The first thread
369 * to complete the future moves the state to the {@link #COMPLETING} state,
370 * sets the value and then sets the appropriate final state.
371 *
372 * @param <V>
373 * The type of value for the future.
374 */
375 /* package */static final class Sync<V> extends AbstractQueuedSynchronizer {
376
377 /** State to represent the future was canceled. */
378 /* package */static final int CANCELED = 4;
379
380 /** State to represent the value has been set. */
381 /* package */static final int COMPLETED = 2;
382
383 /** State set while the value is being set. */
384 /* package */static final int COMPLETING = 1;
385
386 /** State to represent the future was interrupted. */
387 /* package */static final int INTERRUPTED = 8;
388
389 /** The initial running state. */
390 /* package */static final int RUNNING = 0;
391
392 /** The unused value passed to {@link #acquire(int)} methods. */
393 /* package */static final int UNUSED = -1;
394
395 /** Serialization version of the class. */
396 private static final long serialVersionUID = -9189950787072982459L;
397
398 /** The exception for the future. */
399 private Throwable myException;
400
401 /** The value set in the future. */
402 private V myValue;
403
404 /**
405 * Creates a new Sync.
406 */
407 /* package */Sync() {
408 myValue = null;
409 myException = null;
410 }
411
412 /**
413 * Acquisition succeeds if we are done, otherwise fail.
414 */
415 @Override
416 protected int tryAcquireShared(final int ignored) {
417 if (isDone()) {
418 return 1;
419 }
420 return -1;
421 }
422
423 /**
424 * We always allow a release to finish. We define it to represent that a
425 * state transition completed.
426 */
427 @Override
428 protected boolean tryReleaseShared(final int finalState) {
429 setState(finalState);
430 return true;
431 }
432
433 /**
434 * Move to the CANCELED or INTERRUPTED state.
435 *
436 * @param interrupt
437 * If we are interrupted.
438 * @return If the cancel worked / won.
439 */
440 /* package */boolean cancel(final boolean interrupt) {
441 return complete(null, null, interrupt ? INTERRUPTED : CANCELED);
442 }
443
444 /**
445 * Blocks until the future {@link #complete(Object, Throwable, int)
446 * completes}.
447 *
448 * @return The value set for the future.
449 * @throws CancellationException
450 * If the future was canceled.
451 * @throws ExecutionException
452 * If the future failed due to an exception.
453 * @throws InterruptedException
454 * If these call is interrupted.
455 */
456 /* package */V get() throws CancellationException, ExecutionException,
457 InterruptedException {
458
459 // Acquire the shared lock allowing interruption.
460 acquireSharedInterruptibly(UNUSED);
461
462 return getValue();
463 }
464
465 /**
466 * Blocks until the future {@link #complete(Object, Throwable, int)
467 * completes} or the number of nano-seconds expires.
468 *
469 * @param nanos
470 * The number of nano-seconds to wait.
471 * @return The value set for the future.
472 * @throws TimeoutException
473 * If this call time expires.
474 * @throws CancellationException
475 * If the future was canceled.
476 * @throws ExecutionException
477 * If the future failed due to an exception.
478 * @throws InterruptedException
479 * If these call is interrupted.
480 */
481 /* package */V get(final long nanos) throws TimeoutException,
482 CancellationException, ExecutionException, InterruptedException {
483
484 // Attempt to acquire the shared lock with a timeout.
485 if (!tryAcquireSharedNanos(UNUSED, nanos)) {
486 throw new TimeoutException("Timeout waiting for task.");
487 }
488
489 return getValue();
490 }
491
492 /**
493 * Checks if the state is {@link #CANCELED} or {@link #INTERRUPTED}.
494 *
495 * @return True if the future state is {@link #CANCELED} or
496 * {@link #INTERRUPTED}.
497 */
498 /* package */boolean isCancelled() {
499 return (getState() & (CANCELED | INTERRUPTED)) != 0;
500 }
501
502 /**
503 * Checks if the state is {@link #COMPLETED}, {@link #CANCELED} or
504 * {@link #INTERRUPTED}.
505 *
506 * @return True if the future state is {@link #COMPLETED},
507 * {@link #CANCELED} or {@link #INTERRUPTED}.
508 */
509 /* package */boolean isDone() {
510 return (getState() & (COMPLETED | CANCELED | INTERRUPTED)) != 0;
511 }
512
513 /**
514 * Move to the COMPLETED state and set the value.
515 *
516 * @param value
517 * The value to set.
518 * @return If the set worked / won.
519 */
520 /* package */boolean set(final V value) {
521 return complete(value, null, COMPLETED);
522 }
523
524 /**
525 * Move to the COMPLETED state and set the exception value.
526 *
527 * @param thrown
528 * The exception to set.
529 * @return If the set worked / won.
530 */
531 /* package */boolean setException(final Throwable thrown) {
532 return complete(null, thrown, COMPLETED);
533 }
534
535 /**
536 * Completes the future.
537 *
538 * @param value
539 * The value to set as the result of the future.
540 * @param thrown
541 * the exception to set as the result of the future.
542 * @param finalState
543 * the state to transition to.
544 * @return Returns true if the completion was successful / won.
545 */
546 private boolean complete(final V value, final Throwable thrown,
547 final int finalState) {
548
549 // Move to COMPLETING to see if we are the first to complete.
550 final boolean won = compareAndSetState(RUNNING, COMPLETING);
551 if (won) {
552 this.myValue = value;
553 this.myException = ((finalState & (CANCELED | INTERRUPTED)) != 0) ? new CancellationException(
554 "Future was canceled.") : thrown;
555
556 // Release all of the waiting threads.
557 releaseShared(finalState);
558 }
559 else if (getState() == COMPLETING) {
560 // Block until done.
561 acquireShared(UNUSED);
562 }
563
564 return won;
565 }
566
567 /**
568 * Implementation to get the future's value.
569 *
570 * @return The value set for the future.
571 * @throws CancellationException
572 * If the future was canceled.
573 * @throws ExecutionException
574 * If the future failed due to an exception.
575 */
576 private V getValue() throws CancellationException, ExecutionException {
577 final int state = getState();
578 switch (state) {
579 case COMPLETED:
580 if (myException != null) {
581 throw new ExecutionException(myException);
582 }
583 return myValue;
584
585 case CANCELED:
586 case INTERRUPTED:
587 final CancellationException cancellation = new CancellationException(
588 "Future was canceled.");
589 cancellation.initCause(myException);
590
591 throw cancellation;
592
593 default:
594 throw new IllegalStateException("Sync in invalid state: "
595 + state);
596 }
597 }
598 }
599
600 }