1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client;
21
22 import java.util.concurrent.CancellationException;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.Future;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicReference;
29 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
30
31 import com.allanbank.mongodb.Callback;
32 import com.allanbank.mongodb.ListenableFuture;
33 import com.allanbank.mongodb.LockType;
34 import com.allanbank.mongodb.client.callback.ReplyHandler;
35 import com.allanbank.mongodb.util.Assertions;
36 import com.allanbank.mongodb.util.log.Log;
37 import com.allanbank.mongodb.util.log.LogFactory;
38
39
40
41
42
43
44
45
46
47
48
49
50
51 public class FutureCallback<V> implements ListenableFuture<V>, Callback<V> {
52
53
54 public static final Log LOG = LogFactory.getLog(FutureCallback.class);
55
56
57 public static final long SPIN_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) / 100;
58
59
60 private static final int SPIN_ITERATIONS = 10000;
61
62
63 private static final long YIELD_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) >> 1;
64
65
66 private final LockType myLockType;
67
68
69 private AtomicReference<PendingListener> myPendingListeners;
70
71
72 private final Sync<V> mySync;
73
74
75
76
77 public FutureCallback() {
78 this(LockType.MUTEX);
79 }
80
81
82
83
84
85
86
87
88 public FutureCallback(final LockType lockType) {
89 mySync = new Sync<V>();
90 myLockType = lockType;
91 myPendingListeners = new AtomicReference<PendingListener>(null);
92 }
93
94
95
96
97 @Override
98 public void addListener(final Runnable runnable, final Executor executor) {
99 Assertions.assertNotNull(runnable, "Runnable is null.");
100 Assertions.assertNotNull(executor, "Executor is null.");
101
102 if (!isDone()) {
103 PendingListener existing = myPendingListeners.get();
104 PendingListener listener = new PendingListener(runnable, executor,
105 existing);
106
107 while (!myPendingListeners.compareAndSet(existing, listener)) {
108 existing = myPendingListeners.get();
109 listener = new PendingListener(runnable, executor, existing);
110 }
111
112 if (isDone()) {
113 execute();
114 }
115 }
116 else {
117
118 execute(executor, runnable);
119 }
120 }
121
122
123
124
125
126
127
128
129
130
131 @Override
132 public void callback(final V result) {
133 final boolean set = mySync.set(result);
134 if (set) {
135 execute();
136 }
137 }
138
139
140
141
142
143
144
145
146
147
148
149
150 @Override
151 public boolean cancel(final boolean mayInterruptIfRunning) {
152 if (!mySync.cancel(mayInterruptIfRunning)) {
153 return false;
154 }
155 execute();
156
157 return true;
158 }
159
160
161
162
163
164
165
166
167
168
169 @Override
170 public void exception(final Throwable thrown) {
171 Assertions.assertNotNull(thrown, "Cannot set a null exception.");
172
173 final boolean set = mySync.setException(thrown);
174 if (set) {
175 execute();
176 }
177 }
178
179
180
181
182
183
184
185
186
187 @Override
188 public V get() throws InterruptedException, ExecutionException {
189
190 if (myLockType == LockType.LOW_LATENCY_SPIN) {
191 long now = 0;
192 long spinDeadline = 1;
193 long yeildDeadline = 1;
194 while ((now < yeildDeadline) && !isDone()) {
195 for (int i = 0; (i < SPIN_ITERATIONS) && !isDone(); ++i) {
196
197 }
198
199 if (!isDone()) {
200
201 now = System.nanoTime();
202 if (spinDeadline == 1) {
203 spinDeadline = now + SPIN_TIME_NS;
204 yeildDeadline = now + YIELD_TIME_NS;
205
206
207 Thread.yield();
208 }
209 else {
210 if ((spinDeadline < now) && (now < yeildDeadline)) {
211 Thread.yield();
212 }
213 }
214 }
215 }
216 }
217
218 final long shortPause = TimeUnit.MILLISECONDS.toNanos(10);
219 while (true) {
220 try {
221
222
223 return mySync.get(shortPause);
224 }
225 catch (final TimeoutException te) {
226 ReplyHandler.tryReceive();
227 }
228 }
229 }
230
231
232
233
234 @Override
235 public V get(final long timeout, final TimeUnit unit)
236 throws InterruptedException, TimeoutException, ExecutionException {
237 long now = System.nanoTime();
238 final long deadline = now + unit.toNanos(timeout);
239 final long shortPause = TimeUnit.MILLISECONDS.toNanos(10);
240 while (true) {
241 try {
242
243 return mySync.get(Math.min((deadline - now), shortPause));
244 }
245 catch (final TimeoutException te) {
246
247 now = System.nanoTime();
248 if (now < deadline) {
249 ReplyHandler.tryReceive();
250 }
251 else {
252 throw te;
253 }
254 }
255 }
256 }
257
258
259
260
261
262
263
264
265
266 @Override
267 public boolean isCancelled() {
268 return mySync.isCancelled();
269 }
270
271
272
273
274
275
276
277
278
279
280 @Override
281 public boolean isDone() {
282 return mySync.isDone();
283 }
284
285
286
287
288
289
290
291 protected void execute() {
292 PendingListener toRun;
293 PendingListener next;
294
295
296 do {
297
298
299 do {
300 toRun = myPendingListeners.get();
301 next = (toRun != null) ? toRun.myNext : null;
302 }
303 while (!myPendingListeners.compareAndSet(toRun, next));
304
305
306 if (toRun != null) {
307 execute(toRun.myExecutor, toRun.myRunnable);
308 }
309 }
310 while (toRun != null);
311 }
312
313
314
315
316
317
318
319
320
321
322 private void execute(final Executor executor, final Runnable runnable) {
323 try {
324 executor.execute(runnable);
325 }
326 catch (final RuntimeException e) {
327 LOG.error(e, "Exception running a FutureListener's runnable {} "
328 + "with executor {}", runnable, executor);
329 }
330 }
331
332
333
334
335
336
337 static final class PendingListener {
338
339
340 final Executor myExecutor;
341
342
343 final PendingListener myNext;
344
345
346 final Runnable myRunnable;
347
348
349
350
351
352
353
354
355
356
357
358 PendingListener(final Runnable runnable,
359 final Executor executor, final PendingListener next) {
360 myRunnable = runnable;
361 myExecutor = executor;
362 myNext = next;
363 }
364 }
365
366
367
368
369
370
371
372
373
374
375 static final class Sync<V> extends AbstractQueuedSynchronizer {
376
377
378 static final int CANCELED = 4;
379
380
381 static final int COMPLETED = 2;
382
383
384 static final int COMPLETING = 1;
385
386
387 static final int INTERRUPTED = 8;
388
389
390 static final int RUNNING = 0;
391
392
393 static final int UNUSED = -1;
394
395
396 private static final long serialVersionUID = -9189950787072982459L;
397
398
399 private Throwable myException;
400
401
402 private V myValue;
403
404
405
406
407 Sync() {
408 myValue = null;
409 myException = null;
410 }
411
412
413
414
415 @Override
416 protected int tryAcquireShared(final int ignored) {
417 if (isDone()) {
418 return 1;
419 }
420 return -1;
421 }
422
423
424
425
426
427 @Override
428 protected boolean tryReleaseShared(final int finalState) {
429 setState(finalState);
430 return true;
431 }
432
433
434
435
436
437
438
439
440 boolean cancel(final boolean interrupt) {
441 return complete(null, null, interrupt ? INTERRUPTED : CANCELED);
442 }
443
444
445
446
447
448
449
450
451
452
453
454
455
456 V get() throws CancellationException, ExecutionException,
457 InterruptedException {
458
459
460 acquireSharedInterruptibly(UNUSED);
461
462 return getValue();
463 }
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481 V get(final long nanos) throws TimeoutException,
482 CancellationException, ExecutionException, InterruptedException {
483
484
485 if (!tryAcquireSharedNanos(UNUSED, nanos)) {
486 throw new TimeoutException("Timeout waiting for task.");
487 }
488
489 return getValue();
490 }
491
492
493
494
495
496
497
498 boolean isCancelled() {
499 return (getState() & (CANCELED | INTERRUPTED)) != 0;
500 }
501
502
503
504
505
506
507
508
509 boolean isDone() {
510 return (getState() & (COMPLETED | CANCELED | INTERRUPTED)) != 0;
511 }
512
513
514
515
516
517
518
519
520 boolean set(final V value) {
521 return complete(value, null, COMPLETED);
522 }
523
524
525
526
527
528
529
530
531 boolean setException(final Throwable thrown) {
532 return complete(null, thrown, COMPLETED);
533 }
534
535
536
537
538
539
540
541
542
543
544
545
546 private boolean complete(final V value, final Throwable thrown,
547 final int finalState) {
548
549
550 final boolean won = compareAndSetState(RUNNING, COMPLETING);
551 if (won) {
552 this.myValue = value;
553 this.myException = ((finalState & (CANCELED | INTERRUPTED)) != 0) ? new CancellationException(
554 "Future was canceled.") : thrown;
555
556
557 releaseShared(finalState);
558 }
559 else if (getState() == COMPLETING) {
560
561 acquireShared(UNUSED);
562 }
563
564 return won;
565 }
566
567
568
569
570
571
572
573
574
575
576 private V getValue() throws CancellationException, ExecutionException {
577 final int state = getState();
578 switch (state) {
579 case COMPLETED:
580 if (myException != null) {
581 throw new ExecutionException(myException);
582 }
583 return myValue;
584
585 case CANCELED:
586 case INTERRUPTED:
587 final CancellationException cancellation = new CancellationException(
588 "Future was canceled.");
589 cancellation.initCause(myException);
590
591 throw cancellation;
592
593 default:
594 throw new IllegalStateException("Sync in invalid state: "
595 + state);
596 }
597 }
598 }
599
600 }