1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
|
20 | |
package com.allanbank.mongodb.client; |
21 | |
|
22 | |
import java.lang.reflect.InvocationHandler; |
23 | |
import java.lang.reflect.Method; |
24 | |
import java.lang.reflect.Proxy; |
25 | |
import java.util.ArrayList; |
26 | |
import java.util.Collections; |
27 | |
import java.util.LinkedList; |
28 | |
import java.util.List; |
29 | |
import java.util.concurrent.CancellationException; |
30 | |
import java.util.concurrent.Future; |
31 | |
|
32 | |
import com.allanbank.mongodb.BatchedAsyncMongoCollection; |
33 | |
import com.allanbank.mongodb.Callback; |
34 | |
import com.allanbank.mongodb.Durability; |
35 | |
import com.allanbank.mongodb.MongoDatabase; |
36 | |
import com.allanbank.mongodb.MongoDbException; |
37 | |
import com.allanbank.mongodb.Version; |
38 | |
import com.allanbank.mongodb.bson.Document; |
39 | |
import com.allanbank.mongodb.builder.BatchedWrite; |
40 | |
import com.allanbank.mongodb.builder.BatchedWrite.Bundle; |
41 | |
import com.allanbank.mongodb.builder.BatchedWriteMode; |
42 | |
import com.allanbank.mongodb.client.callback.AbstractReplyCallback; |
43 | |
import com.allanbank.mongodb.client.callback.BatchedInsertCountingCallback; |
44 | |
import com.allanbank.mongodb.client.callback.BatchedWriteCallback; |
45 | |
import com.allanbank.mongodb.client.callback.ReplyCallback; |
46 | |
import com.allanbank.mongodb.client.message.Delete; |
47 | |
import com.allanbank.mongodb.client.message.GetLastError; |
48 | |
import com.allanbank.mongodb.client.message.Insert; |
49 | |
import com.allanbank.mongodb.client.message.Reply; |
50 | |
import com.allanbank.mongodb.client.message.Update; |
51 | |
|
52 | |
|
53 | |
|
54 | |
|
55 | |
|
56 | |
|
57 | |
|
58 | |
public class BatchedAsyncMongoCollectionImpl extends |
59 | |
AbstractAsyncMongoCollection implements BatchedAsyncMongoCollection { |
60 | |
|
61 | |
|
62 | 1 | private static final Class<?>[] CLIENT_INTERFACE = new Class[] { Client.class }; |
63 | |
|
64 | |
|
65 | 11 | private boolean myBatchDeletes = false; |
66 | |
|
67 | |
|
68 | 11 | private boolean myBatchUpdates = false; |
69 | |
|
70 | |
|
71 | 11 | private BatchedWriteMode myMode = BatchedWriteMode.SERIALIZE_AND_CONTINUE; |
72 | |
|
73 | |
|
74 | |
|
75 | |
|
76 | |
|
77 | |
|
78 | |
|
79 | |
|
80 | |
|
81 | |
|
82 | |
|
83 | |
public BatchedAsyncMongoCollectionImpl(final Client client, |
84 | |
final MongoDatabase database, final String name) { |
85 | |
|
86 | 11 | super((Client) Proxy.newProxyInstance( |
87 | |
BatchedAsyncMongoCollectionImpl.class.getClassLoader(), |
88 | |
CLIENT_INTERFACE, new CaptureClientHandler(client)), database, |
89 | |
name); |
90 | 11 | } |
91 | |
|
92 | |
|
93 | |
|
94 | |
|
95 | |
|
96 | |
|
97 | |
|
98 | |
@Override |
99 | |
public void cancel() { |
100 | 1 | final InvocationHandler handler = Proxy.getInvocationHandler(myClient); |
101 | 1 | if (handler instanceof CaptureClientHandler) { |
102 | 1 | ((CaptureClientHandler) handler).clear(); |
103 | |
} |
104 | 1 | } |
105 | |
|
106 | |
|
107 | |
|
108 | |
|
109 | |
|
110 | |
|
111 | |
|
112 | |
@Override |
113 | |
public void close() throws MongoDbException { |
114 | 4 | flush(); |
115 | 4 | } |
116 | |
|
117 | |
|
118 | |
|
119 | |
|
120 | |
|
121 | |
|
122 | |
|
123 | |
@Override |
124 | |
public void flush() throws MongoDbException { |
125 | 11 | final InvocationHandler handler = Proxy.getInvocationHandler(myClient); |
126 | 11 | if (handler instanceof CaptureClientHandler) { |
127 | 11 | ((CaptureClientHandler) handler).flush(this); |
128 | |
} |
129 | 11 | } |
130 | |
|
131 | |
|
132 | |
|
133 | |
|
134 | |
|
135 | |
|
136 | |
public BatchedWriteMode getMode() { |
137 | 37 | return myMode; |
138 | |
} |
139 | |
|
140 | |
|
141 | |
|
142 | |
|
143 | |
|
144 | |
|
145 | |
public boolean isBatchDeletes() { |
146 | 8 | return myBatchDeletes; |
147 | |
} |
148 | |
|
149 | |
|
150 | |
|
151 | |
|
152 | |
|
153 | |
|
154 | |
public boolean isBatchUpdates() { |
155 | 14 | return myBatchUpdates; |
156 | |
} |
157 | |
|
158 | |
|
159 | |
|
160 | |
|
161 | |
@Override |
162 | |
public void setBatchDeletes(final boolean batchDeletes) { |
163 | 4 | myBatchDeletes = batchDeletes; |
164 | 4 | } |
165 | |
|
166 | |
|
167 | |
|
168 | |
|
169 | |
@Override |
170 | |
public void setBatchUpdates(final boolean batchUpdates) { |
171 | 4 | myBatchUpdates = batchUpdates; |
172 | 4 | } |
173 | |
|
174 | |
|
175 | |
|
176 | |
|
177 | |
@Override |
178 | |
public void setMode(final BatchedWriteMode mode) { |
179 | 0 | myMode = mode; |
180 | 0 | } |
181 | |
|
182 | |
|
183 | |
|
184 | |
|
185 | |
|
186 | |
|
187 | |
|
188 | |
|
189 | |
|
190 | |
@Override |
191 | |
protected boolean useWriteCommand() { |
192 | 34 | return false; |
193 | |
} |
194 | |
|
195 | |
|
196 | |
|
197 | |
|
198 | |
|
199 | |
|
200 | |
|
201 | |
private static class CaptureClientHandler implements InvocationHandler { |
202 | |
|
203 | |
|
204 | 1 | public static final Version BATCH_WRITE_VERSION = Version |
205 | |
.parse("2.5.4"); |
206 | |
|
207 | |
|
208 | |
private BatchedAsyncMongoCollectionImpl myCollection; |
209 | |
|
210 | |
|
211 | |
private List<Callback<Reply>> myRealCallbacks; |
212 | |
|
213 | |
|
214 | |
|
215 | |
|
216 | |
|
217 | |
private final Client myRealClient; |
218 | |
|
219 | |
|
220 | |
private List<Object> myResults; |
221 | |
|
222 | |
|
223 | |
|
224 | |
|
225 | |
|
226 | |
private final List<Object[]> mySendArgs; |
227 | |
|
228 | |
|
229 | |
private final BatchedWrite.Builder myWrite; |
230 | |
|
231 | |
|
232 | |
|
233 | |
|
234 | |
|
235 | |
|
236 | |
|
237 | |
|
238 | 11 | public CaptureClientHandler(final Client realClient) { |
239 | 11 | myRealClient = realClient; |
240 | |
|
241 | 11 | myRealCallbacks = null; |
242 | 11 | myResults = null; |
243 | |
|
244 | 11 | mySendArgs = new LinkedList<Object[]>(); |
245 | 11 | myWrite = BatchedWrite.builder(); |
246 | 11 | } |
247 | |
|
248 | |
|
249 | |
|
250 | |
|
251 | |
public synchronized void clear() { |
252 | 12 | final List<Object[]> copy = new ArrayList<Object[]>(mySendArgs); |
253 | |
|
254 | 12 | mySendArgs.clear(); |
255 | 12 | myWrite.reset(); |
256 | |
|
257 | 12 | myResults = null; |
258 | 12 | myRealCallbacks = null; |
259 | 12 | myCollection = null; |
260 | |
|
261 | 12 | for (final Object[] args : copy) { |
262 | 4 | final Object lastArg = args[args.length - 1]; |
263 | 4 | if (lastArg instanceof Future<?>) { |
264 | 0 | ((Future<?>) lastArg).cancel(false); |
265 | |
} |
266 | 4 | else if (lastArg instanceof Callback<?>) { |
267 | 4 | ((Callback<?>) lastArg) |
268 | |
.exception(new CancellationException( |
269 | |
"Batch request cancelled.")); |
270 | |
} |
271 | 4 | } |
272 | 12 | } |
273 | |
|
274 | |
|
275 | |
|
276 | |
|
277 | |
|
278 | |
|
279 | |
|
280 | |
public synchronized void flush( |
281 | |
final BatchedAsyncMongoCollectionImpl collection) { |
282 | |
|
283 | |
|
284 | |
|
285 | |
SerialClientImpl serialized; |
286 | 11 | if (myRealClient instanceof SerialClientImpl) { |
287 | 11 | serialized = (SerialClientImpl) myRealClient; |
288 | |
} |
289 | |
else { |
290 | 0 | serialized = new SerialClientImpl((ClientImpl) myRealClient); |
291 | |
} |
292 | |
|
293 | |
try { |
294 | |
|
295 | 11 | final List<Object> optimized = optimize(collection); |
296 | 11 | for (final Object toSend : optimized) { |
297 | 18 | if (toSend instanceof BatchedWriteCallback) { |
298 | 7 | final BatchedWriteCallback cb = (BatchedWriteCallback) toSend; |
299 | 7 | cb.setClient(serialized); |
300 | 7 | cb.send(); |
301 | 7 | } |
302 | 11 | else if (toSend instanceof Object[]) { |
303 | 11 | final Object[] sendArg = (Object[]) toSend; |
304 | 11 | if (sendArg.length == 2) { |
305 | 0 | serialized.send((Message) sendArg[0], |
306 | |
(ReplyCallback) sendArg[1]); |
307 | |
} |
308 | |
else { |
309 | 11 | serialized.send((Message) sendArg[0], |
310 | |
(Message) sendArg[1], |
311 | |
(ReplyCallback) sendArg[2]); |
312 | |
} |
313 | |
} |
314 | 18 | } |
315 | |
} |
316 | |
finally { |
317 | 11 | clear(); |
318 | 11 | } |
319 | 11 | } |
320 | |
|
321 | |
|
322 | |
|
323 | |
|
324 | |
|
325 | |
|
326 | |
|
327 | |
@Override |
328 | |
public synchronized Object invoke(final Object proxy, |
329 | |
final Method method, final Object[] args) throws Throwable { |
330 | |
|
331 | 87 | final String methodName = method.getName(); |
332 | |
|
333 | 87 | if (methodName.equals("send")) { |
334 | 34 | mySendArgs.add(args); |
335 | 34 | return null; |
336 | |
} |
337 | 53 | return method.invoke(myRealClient, args); |
338 | |
} |
339 | |
|
340 | |
|
341 | |
|
342 | |
|
343 | |
|
344 | |
|
345 | |
|
346 | |
|
347 | |
|
348 | |
private void addDelete(final Delete delete, final Object[] args) { |
349 | |
|
350 | 6 | updateDurability(args); |
351 | |
|
352 | 6 | myRealCallbacks.add(extractCallback(args)); |
353 | 6 | myWrite.delete(delete.getQuery(), delete.isSingleDelete()); |
354 | 6 | } |
355 | |
|
356 | |
|
357 | |
|
358 | |
|
359 | |
|
360 | |
|
361 | |
|
362 | |
|
363 | |
|
364 | |
private void addInsert(final Insert insert, final Object[] args) { |
365 | |
|
366 | 7 | updateDurability(args); |
367 | |
|
368 | 7 | final int docCount = insert.getDocuments().size(); |
369 | 7 | Callback<Reply> cb = extractCallback(args); |
370 | 7 | final boolean breakBatch = (cb != null) |
371 | |
&& insert.isContinueOnError() && (docCount > 1); |
372 | |
|
373 | 7 | if (breakBatch) { |
374 | 0 | closeBatch(); |
375 | 0 | myWrite.setMode(BatchedWriteMode.SERIALIZE_AND_STOP); |
376 | |
} |
377 | |
else { |
378 | 7 | cb = new BatchedInsertCountingCallback(cb, docCount); |
379 | |
} |
380 | |
|
381 | 7 | for (final Document doc : insert.getDocuments()) { |
382 | 7 | myWrite.insert(doc); |
383 | 7 | myRealCallbacks.add(cb); |
384 | 7 | } |
385 | 7 | if (breakBatch) { |
386 | 0 | closeBatch(); |
387 | |
} |
388 | 7 | } |
389 | |
|
390 | |
|
391 | |
|
392 | |
|
393 | |
|
394 | |
|
395 | |
|
396 | |
|
397 | |
|
398 | |
private void addUpdate(final Update update, final Object[] args) { |
399 | |
|
400 | 6 | updateDurability(args); |
401 | |
|
402 | 6 | myRealCallbacks.add(extractCallback(args)); |
403 | 6 | myWrite.update(update.getQuery(), update.getUpdate(), |
404 | |
update.isMultiUpdate(), update.isUpsert()); |
405 | 6 | } |
406 | |
|
407 | |
|
408 | |
|
409 | |
|
410 | |
|
411 | |
private void closeBatch() { |
412 | 9 | final ClusterStats stats = myRealClient.getClusterStats(); |
413 | 9 | final BatchedWrite w = myWrite.build(); |
414 | 9 | final List<Bundle> bundles = w.toBundles(myCollection.getName(), |
415 | |
stats.getSmallestMaxBsonObjectSize(), |
416 | |
stats.getSmallestMaxBatchedWriteOperations()); |
417 | 9 | if (!bundles.isEmpty()) { |
418 | 7 | final BatchedWriteCallback cb = new BatchedWriteCallback( |
419 | |
myCollection.getDatabaseName(), myCollection.getName(), |
420 | |
myRealCallbacks, w, bundles); |
421 | 7 | myResults.add(cb); |
422 | |
} |
423 | |
|
424 | 9 | myWrite.reset(); |
425 | 9 | myWrite.setMode(myCollection.getMode()); |
426 | |
|
427 | 9 | myRealCallbacks.clear(); |
428 | 9 | } |
429 | |
|
430 | |
|
431 | |
|
432 | |
|
433 | |
|
434 | |
|
435 | |
|
436 | |
|
437 | |
|
438 | |
|
439 | |
private Callback<Reply> extractCallback(final Object[] args) { |
440 | 19 | final Object cb = args[args.length - 1]; |
441 | 19 | if (cb instanceof AbstractReplyCallback<?>) { |
442 | 19 | return (AbstractReplyCallback<?>) args[2]; |
443 | |
} |
444 | |
|
445 | 0 | return null; |
446 | |
} |
447 | |
|
448 | |
|
449 | |
|
450 | |
|
451 | |
|
452 | |
|
453 | |
|
454 | |
|
455 | |
|
456 | |
|
457 | |
private List<Object> optimize( |
458 | |
final BatchedAsyncMongoCollectionImpl collection) { |
459 | |
|
460 | 11 | if (mySendArgs.isEmpty()) { |
461 | 1 | return Collections.emptyList(); |
462 | |
} |
463 | |
|
464 | 10 | final ClusterStats stats = myRealClient.getClusterStats(); |
465 | 10 | final Version minVersion = stats.getServerVersionRange() |
466 | |
.getLowerBounds(); |
467 | 10 | final boolean supportsBatch = BATCH_WRITE_VERSION |
468 | |
.compareTo(minVersion) <= 0; |
469 | 10 | if (supportsBatch) { |
470 | 7 | myCollection = collection; |
471 | |
|
472 | 7 | myWrite.reset(); |
473 | 7 | myWrite.setMode(collection.getMode()); |
474 | |
|
475 | 7 | myResults = new ArrayList<Object>(mySendArgs.size()); |
476 | 7 | myRealCallbacks = new ArrayList<Callback<Reply>>( |
477 | |
mySendArgs.size()); |
478 | |
|
479 | 28 | while (!mySendArgs.isEmpty()) { |
480 | 21 | final Object[] args = mySendArgs.remove(0); |
481 | 21 | if (args[0] instanceof Insert) { |
482 | 7 | addInsert((Insert) args[0], args); |
483 | |
} |
484 | 14 | else if (collection.isBatchUpdates() |
485 | |
&& (args[0] instanceof Update)) { |
486 | 6 | addUpdate((Update) args[0], args); |
487 | |
} |
488 | 8 | else if (collection.isBatchDeletes() |
489 | |
&& (args[0] instanceof Delete)) { |
490 | 6 | addDelete((Delete) args[0], args); |
491 | |
} |
492 | |
else { |
493 | 2 | closeBatch(); |
494 | 2 | myResults.add(args); |
495 | |
} |
496 | |
|
497 | 21 | if (collection.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) { |
498 | 0 | closeBatch(); |
499 | |
} |
500 | 21 | } |
501 | |
|
502 | 7 | closeBatch(); |
503 | |
} |
504 | |
else { |
505 | 3 | myResults = new ArrayList<Object>(mySendArgs.size()); |
506 | 3 | myResults.addAll(mySendArgs); |
507 | |
|
508 | |
|
509 | 3 | mySendArgs.clear(); |
510 | |
} |
511 | |
|
512 | 10 | return myResults; |
513 | |
} |
514 | |
|
515 | |
|
516 | |
|
517 | |
|
518 | |
|
519 | |
|
520 | |
|
521 | |
|
522 | |
|
523 | |
|
524 | |
private void updateDurability(final Object[] args) { |
525 | |
|
526 | 19 | Durability active = myWrite.getDurability(); |
527 | |
|
528 | 19 | if ((args.length == 3) && (args[1] instanceof GetLastError)) { |
529 | 19 | final GetLastError error = (GetLastError) args[1]; |
530 | |
|
531 | 19 | final Durability d = Durability.valueOf(error.getQuery() |
532 | |
.toString()); |
533 | |
|
534 | 19 | if (active == null) { |
535 | 7 | active = d; |
536 | 7 | myWrite.setDurability(active); |
537 | |
} |
538 | 12 | else if (!d.equals(active) && !d.equals(Durability.ACK) |
539 | |
&& !d.equals(Durability.NONE)) { |
540 | 0 | closeBatch(); |
541 | 0 | active = d; |
542 | 0 | myWrite.setDurability(active); |
543 | |
} |
544 | |
} |
545 | 19 | } |
546 | |
} |
547 | |
} |