1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client;
21
22 import java.lang.reflect.InvocationHandler;
23 import java.lang.reflect.Method;
24 import java.lang.reflect.Proxy;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.concurrent.CancellationException;
30 import java.util.concurrent.Future;
31
32 import com.allanbank.mongodb.BatchedAsyncMongoCollection;
33 import com.allanbank.mongodb.Callback;
34 import com.allanbank.mongodb.Durability;
35 import com.allanbank.mongodb.MongoDatabase;
36 import com.allanbank.mongodb.MongoDbException;
37 import com.allanbank.mongodb.Version;
38 import com.allanbank.mongodb.bson.Document;
39 import com.allanbank.mongodb.builder.BatchedWrite;
40 import com.allanbank.mongodb.builder.BatchedWrite.Bundle;
41 import com.allanbank.mongodb.builder.BatchedWriteMode;
42 import com.allanbank.mongodb.client.callback.AbstractReplyCallback;
43 import com.allanbank.mongodb.client.callback.BatchedInsertCountingCallback;
44 import com.allanbank.mongodb.client.callback.BatchedWriteCallback;
45 import com.allanbank.mongodb.client.callback.ReplyCallback;
46 import com.allanbank.mongodb.client.message.Delete;
47 import com.allanbank.mongodb.client.message.GetLastError;
48 import com.allanbank.mongodb.client.message.Insert;
49 import com.allanbank.mongodb.client.message.Reply;
50 import com.allanbank.mongodb.client.message.Update;
51
52
53
54
55
56
57
58 public class BatchedAsyncMongoCollectionImpl extends
59 AbstractAsyncMongoCollection implements BatchedAsyncMongoCollection {
60
61
62 private static final Class<?>[] CLIENT_INTERFACE = new Class[] { Client.class };
63
64
65 private boolean myBatchDeletes = false;
66
67
68 private boolean myBatchUpdates = false;
69
70
71 private BatchedWriteMode myMode = BatchedWriteMode.SERIALIZE_AND_CONTINUE;
72
73
74
75
76
77
78
79
80
81
82
83 public BatchedAsyncMongoCollectionImpl(final Client client,
84 final MongoDatabase database, final String name) {
85
86 super((Client) Proxy.newProxyInstance(
87 BatchedAsyncMongoCollectionImpl.class.getClassLoader(),
88 CLIENT_INTERFACE, new CaptureClientHandler(client)), database,
89 name);
90 }
91
92
93
94
95
96
97
98 @Override
99 public void cancel() {
100 final InvocationHandler handler = Proxy.getInvocationHandler(myClient);
101 if (handler instanceof CaptureClientHandler) {
102 ((CaptureClientHandler) handler).clear();
103 }
104 }
105
106
107
108
109
110
111
112 @Override
113 public void close() throws MongoDbException {
114 flush();
115 }
116
117
118
119
120
121
122
123 @Override
124 public void flush() throws MongoDbException {
125 final InvocationHandler handler = Proxy.getInvocationHandler(myClient);
126 if (handler instanceof CaptureClientHandler) {
127 ((CaptureClientHandler) handler).flush(this);
128 }
129 }
130
131
132
133
134
135
136 public BatchedWriteMode getMode() {
137 return myMode;
138 }
139
140
141
142
143
144
145 public boolean isBatchDeletes() {
146 return myBatchDeletes;
147 }
148
149
150
151
152
153
154 public boolean isBatchUpdates() {
155 return myBatchUpdates;
156 }
157
158
159
160
161 @Override
162 public void setBatchDeletes(final boolean batchDeletes) {
163 myBatchDeletes = batchDeletes;
164 }
165
166
167
168
169 @Override
170 public void setBatchUpdates(final boolean batchUpdates) {
171 myBatchUpdates = batchUpdates;
172 }
173
174
175
176
177 @Override
178 public void setMode(final BatchedWriteMode mode) {
179 myMode = mode;
180 }
181
182
183
184
185
186
187
188
189
190 @Override
191 protected boolean useWriteCommand() {
192 return false;
193 }
194
195
196
197
198
199
200
201 private static class CaptureClientHandler implements InvocationHandler {
202
203
204 public static final Version BATCH_WRITE_VERSION = Version
205 .parse("2.5.4");
206
207
208 private BatchedAsyncMongoCollectionImpl myCollection;
209
210
211 private List<Callback<Reply>> myRealCallbacks;
212
213
214
215
216
217 private final Client myRealClient;
218
219
220 private List<Object> myResults;
221
222
223
224
225
226 private final List<Object[]> mySendArgs;
227
228
229 private final BatchedWrite.Builder myWrite;
230
231
232
233
234
235
236
237
238 public CaptureClientHandler(final Client realClient) {
239 myRealClient = realClient;
240
241 myRealCallbacks = null;
242 myResults = null;
243
244 mySendArgs = new LinkedList<Object[]>();
245 myWrite = BatchedWrite.builder();
246 }
247
248
249
250
251 public synchronized void clear() {
252 final List<Object[]> copy = new ArrayList<Object[]>(mySendArgs);
253
254 mySendArgs.clear();
255 myWrite.reset();
256
257 myResults = null;
258 myRealCallbacks = null;
259 myCollection = null;
260
261 for (final Object[] args : copy) {
262 final Object lastArg = args[args.length - 1];
263 if (lastArg instanceof Future<?>) {
264 ((Future<?>) lastArg).cancel(false);
265 }
266 else if (lastArg instanceof Callback<?>) {
267 ((Callback<?>) lastArg)
268 .exception(new CancellationException(
269 "Batch request cancelled."));
270 }
271 }
272 }
273
274
275
276
277
278
279
280 public synchronized void flush(
281 final BatchedAsyncMongoCollectionImpl collection) {
282
283
284
285 SerialClientImpl serialized;
286 if (myRealClient instanceof SerialClientImpl) {
287 serialized = (SerialClientImpl) myRealClient;
288 }
289 else {
290 serialized = new SerialClientImpl((ClientImpl) myRealClient);
291 }
292
293 try {
294
295 final List<Object> optimized = optimize(collection);
296 for (final Object toSend : optimized) {
297 if (toSend instanceof BatchedWriteCallback) {
298 final BatchedWriteCallback cb = (BatchedWriteCallback) toSend;
299 cb.setClient(serialized);
300 cb.send();
301 }
302 else if (toSend instanceof Object[]) {
303 final Object[] sendArg = (Object[]) toSend;
304 if (sendArg.length == 2) {
305 serialized.send((Message) sendArg[0],
306 (ReplyCallback) sendArg[1]);
307 }
308 else {
309 serialized.send((Message) sendArg[0],
310 (Message) sendArg[1],
311 (ReplyCallback) sendArg[2]);
312 }
313 }
314 }
315 }
316 finally {
317 clear();
318 }
319 }
320
321
322
323
324
325
326
327 @Override
328 public synchronized Object invoke(final Object proxy,
329 final Method method, final Object[] args) throws Throwable {
330
331 final String methodName = method.getName();
332
333 if (methodName.equals("send")) {
334 mySendArgs.add(args);
335 return null;
336 }
337 return method.invoke(myRealClient, args);
338 }
339
340
341
342
343
344
345
346
347
348 private void addDelete(final Delete delete, final Object[] args) {
349
350 updateDurability(args);
351
352 myRealCallbacks.add(extractCallback(args));
353 myWrite.delete(delete.getQuery(), delete.isSingleDelete());
354 }
355
356
357
358
359
360
361
362
363
364 private void addInsert(final Insert insert, final Object[] args) {
365
366 updateDurability(args);
367
368 final int docCount = insert.getDocuments().size();
369 Callback<Reply> cb = extractCallback(args);
370 final boolean breakBatch = (cb != null)
371 && insert.isContinueOnError() && (docCount > 1);
372
373 if (breakBatch) {
374 closeBatch();
375 myWrite.setMode(BatchedWriteMode.SERIALIZE_AND_STOP);
376 }
377 else {
378 cb = new BatchedInsertCountingCallback(cb, docCount);
379 }
380
381 for (final Document doc : insert.getDocuments()) {
382 myWrite.insert(doc);
383 myRealCallbacks.add(cb);
384 }
385 if (breakBatch) {
386 closeBatch();
387 }
388 }
389
390
391
392
393
394
395
396
397
398 private void addUpdate(final Update update, final Object[] args) {
399
400 updateDurability(args);
401
402 myRealCallbacks.add(extractCallback(args));
403 myWrite.update(update.getQuery(), update.getUpdate(),
404 update.isMultiUpdate(), update.isUpsert());
405 }
406
407
408
409
410
411 private void closeBatch() {
412 final ClusterStats stats = myRealClient.getClusterStats();
413 final BatchedWrite w = myWrite.build();
414 final List<Bundle> bundles = w.toBundles(myCollection.getName(),
415 stats.getSmallestMaxBsonObjectSize(),
416 stats.getSmallestMaxBatchedWriteOperations());
417 if (!bundles.isEmpty()) {
418 final BatchedWriteCallback cb = new BatchedWriteCallback(
419 myCollection.getDatabaseName(), myCollection.getName(),
420 myRealCallbacks, w, bundles);
421 myResults.add(cb);
422 }
423
424 myWrite.reset();
425 myWrite.setMode(myCollection.getMode());
426
427 myRealCallbacks.clear();
428 }
429
430
431
432
433
434
435
436
437
438
439 private Callback<Reply> extractCallback(final Object[] args) {
440 final Object cb = args[args.length - 1];
441 if (cb instanceof AbstractReplyCallback<?>) {
442 return (AbstractReplyCallback<?>) args[2];
443 }
444
445 return null;
446 }
447
448
449
450
451
452
453
454
455
456
457 private List<Object> optimize(
458 final BatchedAsyncMongoCollectionImpl collection) {
459
460 if (mySendArgs.isEmpty()) {
461 return Collections.emptyList();
462 }
463
464 final ClusterStats stats = myRealClient.getClusterStats();
465 final Version minVersion = stats.getServerVersionRange()
466 .getLowerBounds();
467 final boolean supportsBatch = BATCH_WRITE_VERSION
468 .compareTo(minVersion) <= 0;
469 if (supportsBatch) {
470 myCollection = collection;
471
472 myWrite.reset();
473 myWrite.setMode(collection.getMode());
474
475 myResults = new ArrayList<Object>(mySendArgs.size());
476 myRealCallbacks = new ArrayList<Callback<Reply>>(
477 mySendArgs.size());
478
479 while (!mySendArgs.isEmpty()) {
480 final Object[] args = mySendArgs.remove(0);
481 if (args[0] instanceof Insert) {
482 addInsert((Insert) args[0], args);
483 }
484 else if (collection.isBatchUpdates()
485 && (args[0] instanceof Update)) {
486 addUpdate((Update) args[0], args);
487 }
488 else if (collection.isBatchDeletes()
489 && (args[0] instanceof Delete)) {
490 addDelete((Delete) args[0], args);
491 }
492 else {
493 closeBatch();
494 myResults.add(args);
495 }
496
497 if (collection.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
498 closeBatch();
499 }
500 }
501
502 closeBatch();
503 }
504 else {
505 myResults = new ArrayList<Object>(mySendArgs.size());
506 myResults.addAll(mySendArgs);
507
508
509 mySendArgs.clear();
510 }
511
512 return myResults;
513 }
514
515
516
517
518
519
520
521
522
523
524 private void updateDurability(final Object[] args) {
525
526 Durability active = myWrite.getDurability();
527
528 if ((args.length == 3) && (args[1] instanceof GetLastError)) {
529 final GetLastError error = (GetLastError) args[1];
530
531 final Durability d = Durability.valueOf(error.getQuery()
532 .toString());
533
534 if (active == null) {
535 active = d;
536 myWrite.setDurability(active);
537 }
538 else if (!d.equals(active) && !d.equals(Durability.ACK)
539 && !d.equals(Durability.NONE)) {
540 closeBatch();
541 active = d;
542 myWrite.setDurability(active);
543 }
544 }
545 }
546 }
547 }