1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package com.allanbank.mongodb.client.callback;
22
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.IdentityHashMap;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30
31 import com.allanbank.mongodb.Callback;
32 import com.allanbank.mongodb.Durability;
33 import com.allanbank.mongodb.MongoDbException;
34 import com.allanbank.mongodb.bson.Document;
35 import com.allanbank.mongodb.bson.Element;
36 import com.allanbank.mongodb.bson.NumericElement;
37 import com.allanbank.mongodb.bson.builder.BuilderFactory;
38 import com.allanbank.mongodb.bson.element.ArrayElement;
39 import com.allanbank.mongodb.bson.element.DocumentElement;
40 import com.allanbank.mongodb.builder.BatchedWrite;
41 import com.allanbank.mongodb.builder.BatchedWrite.Bundle;
42 import com.allanbank.mongodb.builder.BatchedWriteMode;
43 import com.allanbank.mongodb.builder.write.WriteOperation;
44 import com.allanbank.mongodb.client.Client;
45 import com.allanbank.mongodb.client.message.BatchedWriteCommand;
46 import com.allanbank.mongodb.client.message.Command;
47 import com.allanbank.mongodb.client.message.Reply;
48 import com.allanbank.mongodb.error.BatchedWriteException;
49 import com.allanbank.mongodb.util.Assertions;
50
51
52
53
54
55
56
57
58 public class BatchedWriteCallback extends ReplyLongCallback {
59
60
61 private final List<BatchedWrite.Bundle> myBundles;
62
63
64 private Client myClient;
65
66
67 private final String myCollectionName;
68
69
70 private final String myDatabaseName;
71
72
73 private final Map<WriteOperation, Throwable> myFailedOperations;
74
75
76 private int myFinished;
77
78
79 private long myN = 0;
80
81
82 private final List<BatchedWrite.Bundle> myPendingBundles;
83
84
85 private final List<Callback<Reply>> myRealCallbacks;
86
87
88 private List<WriteOperation> mySkippedOperations;
89
90
91 private final BatchedWrite myWrite;
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 public BatchedWriteCallback(final String databaseName,
110 final String collectionName, final Callback<Long> results,
111 final BatchedWrite write, final Client client,
112 final List<BatchedWrite.Bundle> bundles) {
113 super(results);
114
115 myDatabaseName = databaseName;
116 myCollectionName = collectionName;
117 myWrite = write;
118 myClient = client;
119 myBundles = Collections
120 .unmodifiableList(new ArrayList<BatchedWrite.Bundle>(bundles));
121
122 myPendingBundles = new LinkedList<BatchedWrite.Bundle>(myBundles);
123
124 myFinished = 0;
125 myN = 0;
126
127 myFailedOperations = new IdentityHashMap<WriteOperation, Throwable>();
128 mySkippedOperations = null;
129
130 myRealCallbacks = Collections.emptyList();
131 }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 public BatchedWriteCallback(final String databaseName,
148 final String collectionName,
149 final List<Callback<Reply>> realCallbacks,
150 final BatchedWrite write, final List<Bundle> bundles) {
151 super(null);
152
153 myDatabaseName = databaseName;
154 myCollectionName = collectionName;
155 myWrite = write;
156 myClient = null;
157 myBundles = Collections
158 .unmodifiableList(new ArrayList<BatchedWrite.Bundle>(bundles));
159
160 myPendingBundles = new LinkedList<BatchedWrite.Bundle>(myBundles);
161
162 myFinished = 0;
163 myN = 0;
164
165 myFailedOperations = new IdentityHashMap<WriteOperation, Throwable>();
166 mySkippedOperations = null;
167
168 myRealCallbacks = new ArrayList<Callback<Reply>>(realCallbacks);
169
170 int count = 0;
171 for (final Bundle b : myBundles) {
172 count += b.getWrites().size();
173 }
174 Assertions.assertThat(
175 myRealCallbacks.size() == count,
176 "There nust be an operation (" + count
177 + ") in a bundle for each callback ("
178 + myRealCallbacks.size() + ").");
179 }
180
181
182
183
184 public void send() {
185
186 List<BatchedWrite.Bundle> toSendBundles;
187 synchronized (this) {
188 List<BatchedWrite.Bundle> toSend = myPendingBundles;
189 if (BatchedWriteMode.SERIALIZE_AND_STOP.equals(myWrite.getMode())) {
190 toSend = myPendingBundles.subList(0, 1);
191 }
192
193
194
195 toSendBundles = new ArrayList<BatchedWrite.Bundle>(toSend);
196 toSend.clear();
197 }
198
199
200
201
202
203 for (final BatchedWrite.Bundle bundle : toSendBundles) {
204 final Command commandMsg = new BatchedWriteCommand(myDatabaseName,
205 myCollectionName, bundle);
206
207
208 commandMsg.setAllowJumbo(true);
209
210 if (myWrite.getDurability() == Durability.NONE) {
211
212 final Document doc = BuilderFactory.start().add("ok", 1)
213 .add("n", -1).build();
214 final Reply reply = new Reply(0, 0, 0,
215 Collections.singletonList(doc), false, false, false,
216 false);
217
218 myClient.send(commandMsg, NoOpCallback.NO_OP);
219 publish(bundle, reply);
220 }
221 else {
222 myClient.send(commandMsg, new BundleCallback(bundle));
223
224 }
225 }
226
227 if ((myWrite.getDurability() == Durability.NONE)
228 && myPendingBundles.isEmpty() && (myForwardCallback != null)) {
229 myForwardCallback.callback(-1L);
230 }
231 }
232
233
234
235
236
237
238
239 public void setClient(final Client client) {
240 myClient = client;
241 }
242
243
244
245
246
247
248
249
250
251 protected synchronized void callback(final Bundle bundle, final Reply result) {
252 final MongoDbException error = asError(result);
253 if (error != null) {
254
255 exception(bundle, error);
256 }
257 else {
258 myFinished += 1;
259 myN += convert(result).longValue();
260
261
262 final boolean failed = failedDurability(bundle, result)
263 | failedWrites(bundle, result);
264
265 publish(bundle, result);
266
267 if (failed) {
268 publishResults();
269 }
270 else if (!myPendingBundles.isEmpty()) {
271 send();
272 }
273 else if (myFinished == myBundles.size()) {
274 publishResults();
275 }
276 }
277 }
278
279
280
281
282
283
284
285
286
287
288 protected synchronized void exception(final Bundle bundle,
289 final Throwable thrown) {
290 myFinished += 1;
291 for (final WriteOperation operation : bundle.getWrites()) {
292 myFailedOperations.put(operation, thrown);
293 }
294
295
296
297 if (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
298 publishResults();
299 }
300 else if (myFinished == myBundles.size()) {
301 publishResults();
302 }
303 }
304
305
306
307
308
309
310
311
312
313
314
315
316
317 private boolean failedDurability(final Bundle bundle, final Reply reply) {
318 final List<Document> results = reply.getResults();
319 if (results.size() == 1) {
320 final Document doc = results.get(0);
321 final DocumentElement error = doc.get(DocumentElement.class,
322 "writeConcernError");
323 if (error != null) {
324 final int code = toInt(error.get(NumericElement.class, "code"));
325 final String errmsg = asString(error.get(Element.class,
326 "errmsg"));
327 final MongoDbException exception = asError(reply, 0, code,
328 true, errmsg, null);
329 for (final WriteOperation op : bundle.getWrites()) {
330 myFailedOperations.put(op, exception);
331 }
332 }
333 }
334
335 return (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP)
336 && !myFailedOperations.isEmpty();
337 }
338
339
340
341
342
343
344
345
346
347
348
349
350 private boolean failedWrites(final Bundle bundle, final Reply reply) {
351 final List<Document> results = reply.getResults();
352 if (results.size() == 1) {
353 final Document doc = results.get(0);
354 final ArrayElement errors = doc.get(ArrayElement.class,
355 "writeErrors");
356 if (errors != null) {
357 final List<WriteOperation> operations = bundle.getWrites();
358 for (final DocumentElement error : errors.find(
359 DocumentElement.class, ".*")) {
360 final int index = toInt(error.get(NumericElement.class,
361 "index"));
362 final int code = toInt(error.get(NumericElement.class,
363 "code"));
364 final String errmsg = asString(error.get(Element.class,
365 "errmsg"));
366
367 if ((0 <= index) && (index < operations.size())) {
368 final WriteOperation op = operations.get(index);
369
370 myFailedOperations.put(op,
371 asError(reply, 0, code, false, errmsg, null));
372
373 if (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
374 mySkippedOperations = new ArrayList<WriteOperation>();
375 mySkippedOperations.addAll(operations.subList(
376 index + 1, operations.size()));
377 }
378 }
379 }
380 }
381 }
382
383 return (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP)
384 && !myFailedOperations.isEmpty();
385 }
386
387
388
389
390
391
392
393
394
395 private void publish(final Bundle bundle, final Reply reply) {
396 if (myForwardCallback == null) {
397
398 int index = 0;
399 for (final Bundle b : myBundles) {
400 final List<WriteOperation> writes = b.getWrites();
401 final int count = writes.size();
402
403
404
405 if (b == bundle) {
406 for (int i = 0; i < count; ++i) {
407
408 final Throwable t = myFailedOperations.get(writes
409 .get(i));
410 final Callback<Reply> cb = myRealCallbacks.set(index
411 + i, NoOpCallback.NO_OP);
412 if (cb != null) {
413 if (t == null) {
414
415 cb.callback(reply);
416 }
417 else {
418 cb.exception(t);
419 }
420 }
421 }
422 break;
423 }
424
425 index += count;
426 }
427 }
428 }
429
430
431
432
433 private void publishResults() {
434 if (myFailedOperations.isEmpty()) {
435 if (myForwardCallback != null) {
436 myForwardCallback.callback(Long.valueOf(myN));
437 }
438
439
440 }
441 else {
442 if (mySkippedOperations == null) {
443 mySkippedOperations = new ArrayList<WriteOperation>();
444 }
445 for (final Bundle pending : myPendingBundles) {
446 mySkippedOperations.addAll(pending.getWrites());
447 }
448
449 if (myForwardCallback != null) {
450
451
452 if ((myBundles.size() == 1)
453 && (myBundles.get(0).getWrites().size() == 1)
454 && (myFailedOperations.size() == 1)) {
455 myForwardCallback.exception(myFailedOperations.values()
456 .iterator().next());
457 }
458 else {
459 myForwardCallback.exception(new BatchedWriteException(
460 myWrite, myN, mySkippedOperations,
461 myFailedOperations));
462 }
463 }
464 else {
465
466 final List<WriteOperation> emptySkipped = Collections
467 .emptyList();
468 final Map<WriteOperation, Throwable> emptyError = Collections
469 .emptyMap();
470
471
472 final Set<WriteOperation> skipped = Collections
473 .newSetFromMap(new IdentityHashMap<WriteOperation, Boolean>());
474 skipped.addAll(mySkippedOperations);
475
476 final Document doc = BuilderFactory.start().add("ok", 1)
477 .add("n", myN).build();
478 final Reply reply = new Reply(0, 0, 0,
479 Collections.singletonList(doc), false, false, false,
480 false);
481
482 int index = 0;
483 for (final Bundle b : myBundles) {
484 for (final WriteOperation op : b.getWrites()) {
485 final Callback<Reply> cb = myRealCallbacks.get(index);
486
487 if (cb != null) {
488
489 final Throwable thrown = myFailedOperations.get(op);
490 if (thrown != null) {
491 cb.exception(new BatchedWriteException(myWrite,
492 myN, emptySkipped, Collections
493 .singletonMap(op, thrown)));
494 }
495 else if (skipped.contains(op)) {
496
497 cb.exception(new BatchedWriteException(myWrite,
498 myN, Collections.singletonList(op),
499 emptyError));
500 }
501 else {
502
503 cb.callback(reply);
504 }
505 }
506
507
508 index += 1;
509 }
510 }
511 }
512 }
513 }
514
515
516
517
518
519
520
521
522
523 class BundleCallback implements ReplyCallback {
524
525
526
527
528 private final Bundle myBundle;
529
530
531
532
533
534
535
536
537 public BundleCallback(final Bundle bundle) {
538 myBundle = bundle;
539 }
540
541
542
543
544
545
546
547 @Override
548 public void callback(final Reply result) {
549 BatchedWriteCallback.this.callback(myBundle, result);
550 }
551
552
553
554
555
556
557
558 @Override
559 public void exception(final Throwable thrown) {
560 BatchedWriteCallback.this.exception(myBundle, thrown);
561 }
562
563
564
565
566
567
568
569 @Override
570 public boolean isLightWeight() {
571 return false;
572 }
573 }
574 }