1 /*
2 * #%L
3 * BatchedWrite.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20
21 package com.allanbank.mongodb.builder;
22
23 import java.io.Serializable;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.LinkedHashMap;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.SortedMap;
31 import java.util.TreeMap;
32
33 import com.allanbank.mongodb.BatchedAsyncMongoCollection;
34 import com.allanbank.mongodb.Durability;
35 import com.allanbank.mongodb.MongoCollection;
36 import com.allanbank.mongodb.Version;
37 import com.allanbank.mongodb.bson.Document;
38 import com.allanbank.mongodb.bson.DocumentAssignable;
39 import com.allanbank.mongodb.bson.Element;
40 import com.allanbank.mongodb.bson.builder.ArrayBuilder;
41 import com.allanbank.mongodb.bson.builder.BuilderFactory;
42 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
43 import com.allanbank.mongodb.bson.impl.EmptyDocument;
44 import com.allanbank.mongodb.builder.write.DeleteOperation;
45 import com.allanbank.mongodb.builder.write.InsertOperation;
46 import com.allanbank.mongodb.builder.write.UpdateOperation;
47 import com.allanbank.mongodb.builder.write.WriteOperation;
48 import com.allanbank.mongodb.builder.write.WriteOperationType;
49 import com.allanbank.mongodb.error.DocumentToLargeException;
50
51 /**
52 * BatchedWrite provides a container for a group of write operations to be sent
53 * to the server as one group.
54 * <p>
55 * The default mode ({@link BatchedWriteMode#SERIALIZE_AND_CONTINUE}) for this
56 * class is to submit the operations to the server in the order that they are
57 * added to the Builder and to apply as many of the writes as possible (commonly
58 * referred to as continue-on-error). This has the effect of causing the fewest
59 * surprises and optimizing the performance of the writes since the driver can
60 * send multiple distinct writes to the server at once.
61 * </p>
62 * <p>
63 * The {@link BatchedWriteMode#SERIALIZE_AND_STOP} mode also sends each write as
64 * a separate request but instead of attempting all writes the driver will stop
65 * sending requests once one of the writes fails. This also prevents the driver
66 * from sending multiple write messages to the server which can degrade
67 * performance.
68 * </p>
69 * <p>
70 * The last mode, {@link BatchedWriteMode#REORDERED}, may re-order writes to
71 * maximize performance. Similar to the
72 * {@link BatchedWriteMode#SERIALIZE_AND_CONTINUE} this mode will also attempt
73 * all writes. The reordering of writes is across all {@link WriteOperationType}
74 * s.
75 * </p>
76 * <p>
77 * If using a MongoDB server after {@link #REQUIRED_VERSION 2.5.5} a batched
78 * write will result in use of the new write commands.
79 * </p>
80 * <p>
81 * For a more generalized batched write and query capability see the
82 * {@link BatchedAsyncMongoCollection} and {@link MongoCollection#startBatch()}.
83 * </p>
84 *
85 * @api.yes This class is part of the driver's API. Public and protected members
86 * will be deprecated for at least 1 non-bugfix release (version
87 * numbers are <major>.<minor>.<bugfix>) before being
88 * removed or modified.
89 * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
90 */
91 public class BatchedWrite implements Serializable {
92
93 /** The first version of MongoDB to support the {@code aggregate} command. */
94 public static final Version REQUIRED_VERSION = Version.parse("2.5.5");
95
96 /** Serialization version for the class. */
97 private static final long serialVersionUID = 6984498574755719178L;
98
99 /**
100 * Creates a new builder for a {@link BatchedWrite}.
101 *
102 * @return The builder to construct a {@link BatchedWrite}.
103 */
104 public static Builder builder() {
105 return new Builder();
106 }
107
108 /**
109 * Create a batched write with a single delete operation. Users can just use
110 * the {@link MongoCollection#delete} variants and the driver will convert
111 * the deletes to batched writes as appropriate.
112 * <p>
113 * This method avoids the construction of a builder.
114 * </p>
115 *
116 * @param query
117 * The query to find the documents to delete.
118 * @param singleDelete
119 * If true then only a single document will be deleted. If
120 * running in a sharded environment then this field must be false
121 * or the query must contain the shard key.
122 * @param durability
123 * The durability of the delete.
124 * @return The BatchedWrite with the single delete.
125 */
126 public static BatchedWrite delete(final DocumentAssignable query,
127 final boolean singleDelete, final Durability durability) {
128 final DeleteOperation op = new DeleteOperation(query, singleDelete);
129 return new BatchedWrite(op, BatchedWriteMode.SERIALIZE_AND_CONTINUE,
130 durability);
131 }
132
133 /**
134 * Create a batched write with a single inserts operation. Users can just
135 * use the {@link MongoCollection#insert} variants and the driver will
136 * convert the inserts to batched writes as appropriate.
137 * <p>
138 * This method avoids the construction of a builder.
139 * </p>
140 *
141 * @param continueOnError
142 * If the insert should continue if one of the documents causes
143 * an error.
144 * @param durability
145 * The durability for the insert.
146 * @param documents
147 * The documents to add to the collection.
148 * @return The BatchedWrite with the inserts.
149 */
150 public static BatchedWrite insert(final boolean continueOnError,
151 final Durability durability, final DocumentAssignable... documents) {
152 final List<WriteOperation> ops = new ArrayList<WriteOperation>(
153 documents.length);
154 for (final DocumentAssignable doc : documents) {
155 ops.add(new InsertOperation(doc));
156 }
157 return new BatchedWrite(ops,
158 continueOnError ? BatchedWriteMode.SERIALIZE_AND_CONTINUE
159 : BatchedWriteMode.SERIALIZE_AND_STOP, durability);
160 }
161
162 /**
163 * Create a batched write with a single update operation. Users can just use
164 * the {@link MongoCollection#update} variants and the driver will convert
165 * the updates to batched writes as appropriate.
166 *
167 * @param query
168 * The query for the update.
169 * @param update
170 * The update for the update.
171 * @param multiUpdate
172 * If true then the update will update multiple documents.
173 * @param upsert
174 * If no document is found then upsert the document.
175 * @param durability
176 * The durability of the update.
177 * @return The BatchedWrite with the single update.
178 */
179 public static BatchedWrite update(final DocumentAssignable query,
180 final DocumentAssignable update, final boolean multiUpdate,
181 final boolean upsert, final Durability durability) {
182 final UpdateOperation op = new UpdateOperation(query, update,
183 multiUpdate, upsert);
184 return new BatchedWrite(op, BatchedWriteMode.SERIALIZE_AND_CONTINUE,
185 durability);
186 }
187
188 /** The durability for the writes. */
189 private final Durability myDurability;
190
191 /** The mode for submitting the writes to the server. */
192 private final BatchedWriteMode myMode;
193
194 /** The writes to submit to the server. */
195 private final List<WriteOperation> myWrites;
196
197 /**
198 * Creates a new BatchedWrite.
199 *
200 * @param builder
201 * The builder for the writes.
202 */
203 protected BatchedWrite(final Builder builder) {
204 myWrites = Collections.unmodifiableList(new ArrayList<WriteOperation>(
205 builder.myWrites));
206 myMode = builder.myMode;
207 myDurability = builder.myDurability;
208 }
209
210 /**
211 * Creates a new BatchedWrite.
212 *
213 * @param ops
214 * The operations for the batch.
215 * @param mode
216 * The mode for the batch.
217 * @param durability
218 * The durability for the batch.
219 */
220 private BatchedWrite(final List<WriteOperation> ops,
221 final BatchedWriteMode mode, final Durability durability) {
222 myWrites = Collections.unmodifiableList(ops);
223 myMode = mode;
224 myDurability = durability;
225 }
226
227 /**
228 * Creates a new BatchedWrite.
229 *
230 * @param op
231 * The single operation for the batch.
232 * @param mode
233 * The mode for the batch.
234 * @param durability
235 * The durability for the batch.
236 */
237 private BatchedWrite(final WriteOperation op, final BatchedWriteMode mode,
238 final Durability durability) {
239 this(Collections.singletonList(op), mode, durability);
240 }
241
242 /**
243 * Returns the durability for the writes.
244 *
245 * @return The durability for the writes.
246 */
247 public Durability getDurability() {
248 return myDurability;
249 }
250
251 /**
252 * Returns the mode for submitting the writes to the server.
253 *
254 * @return The mode for submitting the writes to the server.
255 */
256 public BatchedWriteMode getMode() {
257 return myMode;
258 }
259
260 /**
261 * Returns the writes to submit to the server.
262 *
263 * @return The writes to submit to the server.
264 */
265 public List<WriteOperation> getWrites() {
266 return myWrites;
267 }
268
269 /**
270 * Creates write commands for all of the insert, updates and deletes. The
271 * number and order of the writes is based on the {@link #getMode() mode}.
272 *
273 * @param collectionName
274 * The name of the collection the documents will be inserted
275 * into.
276 * @param maxCommandSize
277 * The maximum document size.
278 * @param maxOperationsPerBundle
279 * The maximum number of writes to include in each bundle.
280 * @return The list of command documents to be sent.
281 */
282 public List<Bundle> toBundles(final String collectionName,
283 final long maxCommandSize, final int maxOperationsPerBundle) {
284 switch (getMode()) {
285 case REORDERED: {
286 return createOptimized(collectionName, maxCommandSize,
287 maxOperationsPerBundle);
288 }
289 case SERIALIZE_AND_CONTINUE: {
290 return createSerialized(collectionName, maxCommandSize,
291 maxOperationsPerBundle, false);
292 }
293 default: {
294 return createSerialized(collectionName, maxCommandSize,
295 maxOperationsPerBundle, true);
296 }
297 }
298 }
299
300 /**
301 * Adds the document to the array of documents.
302 *
303 * @param array
304 * The array to add the operation to.
305 * @param operation
306 * The operation to add.
307 */
308 private void add(final ArrayBuilder array, final WriteOperation operation) {
309 switch (operation.getType()) {
310 case INSERT: {
311 final InsertOperation insertOperation = (InsertOperation) operation;
312
313 array.add(insertOperation.getDocument());
314 break;
315 }
316 case UPDATE: {
317 final UpdateOperation updateOperation = (UpdateOperation) operation;
318 final DocumentBuilder update = array.push();
319
320 update.add("q", updateOperation.getQuery());
321 update.add("u", updateOperation.getUpdate());
322 if (updateOperation.isUpsert()) {
323 update.add("upsert", true);
324 }
325 if (updateOperation.isMultiUpdate()) {
326 update.add("multi", true);
327 }
328 break;
329 }
330 case DELETE: {
331 final DeleteOperation deleteOperation = (DeleteOperation) operation;
332 array.push().add("q", deleteOperation.getQuery())
333 .add("limit", deleteOperation.isSingleDelete() ? 1 : 0);
334 break;
335 }
336 }
337 }
338
339 /**
340 * Adds the durability ('writeConcern') to the command document.
341 *
342 * @param command
343 * The command document to add the durability to.
344 * @param durability
345 * The durability to add. May be <code>null</code>.
346 */
347 private void addDurability(final DocumentBuilder command,
348 final Durability durability) {
349 if (durability != null) {
350 final DocumentBuilder durabilityDoc = command.push("writeConcern");
351 if (durability.equals(Durability.NONE)) {
352 durabilityDoc.add("w", 0);
353 }
354 else if (durability.equals(Durability.ACK)) {
355 durabilityDoc.add("w", 1);
356 }
357 else {
358 boolean first = true;
359 for (final Element part : durability.asDocument()) {
360 if (first) {
361 // The first element is "getlasterror".
362 first = false;
363 }
364 else {
365 durabilityDoc.add(part);
366 }
367 }
368 }
369 }
370 }
371
372 /**
373 * Creates a {@link DocumentToLargeException} for the operation.
374 *
375 * @param operation
376 * The large operation.
377 * @param size
378 * The size of the operation.
379 * @param maxCommandSize
380 * The maximum size of the operation.
381 * @return The created exception.
382 */
383 private DocumentToLargeException createDocumentToLargeException(
384 final WriteOperation operation, final int size,
385 final int maxCommandSize) {
386
387 Document doc = EmptyDocument.INSTANCE;
388
389 switch (operation.getType()) {
390 case INSERT: {
391 final InsertOperation insertOperation = (InsertOperation) operation;
392 doc = insertOperation.getDocument();
393 break;
394 }
395 case UPDATE: {
396 final UpdateOperation updateOperation = (UpdateOperation) operation;
397 doc = updateOperation.getQuery();
398 final Document update = updateOperation.getUpdate();
399 if (doc.size() < update.size()) {
400 doc = update;
401 }
402 break;
403 }
404 case DELETE: {
405 final DeleteOperation deleteOperation = (DeleteOperation) operation;
406 doc = deleteOperation.getQuery();
407 break;
408 }
409 }
410
411 return new DocumentToLargeException(size, maxCommandSize, doc);
412 }
413
414 /**
415 * Reorders the writes into as few write commands as possible.
416 * <p>
417 * <b>Note</b>: MongoDB gives a slightly larger document for the command (<a
418 * href=
419 * "https://github.com/mongodb/mongo/blob/master/src/mongo/bson/util/builder.h#L56"
420 * >16K</a>). This is for the command overhead. We don't explicitly use the
421 * overhead but we may end up implicitly using it in the case of a operation
422 * that is just at or below maxCommandSize. For those cases we start the
423 * 'head' map below with the full map. That allows the big operations to be
424 * added to command documents of there own once the command overhead has
425 * been factored in.
426 * </p>
427 *
428 * @param collectionName
429 * The name of the collection the documents will be inserted
430 * into.
431 * @param maxCommandSize
432 * The maximum document size.
433 * @param maxOperationsPerBundle
434 * The maximum number of writes to include in each bundle.
435 * @return The list of command documents to be sent.
436 */
437 private List<Bundle> createOptimized(final String collectionName,
438 final long maxCommandSize, final int maxOperationsPerBundle) {
439 // Bucket the operations and sort by size.
440 Map<WriteOperationType, SortedMap<Long, List<WriteOperation>>> operationsBuckets;
441 operationsBuckets = new LinkedHashMap<WriteOperationType, SortedMap<Long, List<WriteOperation>>>();
442 for (final WriteOperation writeOp : getWrites()) {
443 SortedMap<Long, List<WriteOperation>> operations = operationsBuckets
444 .get(writeOp.getType());
445 if (operations == null) {
446 operations = new TreeMap<Long, List<WriteOperation>>();
447 operationsBuckets.put(writeOp.getType(), operations);
448 }
449
450 final Long size = Long.valueOf(sizeOf(-1, writeOp));
451 List<WriteOperation> list = operations.get(size);
452 if (list == null) {
453 list = new LinkedList<WriteOperation>();
454 operations.put(size, list);
455 }
456 list.add(writeOp);
457 }
458
459 // Check if any operation is too big.
460 final Long maxMessageSize = Long.valueOf(maxCommandSize + 1);
461 for (final SortedMap<Long, List<WriteOperation>> operations : operationsBuckets
462 .values()) {
463 if (!operations.tailMap(maxMessageSize).isEmpty()) {
464 final Long biggest = operations.lastKey();
465 final List<WriteOperation> operation = operations.get(biggest);
466 throw createDocumentToLargeException(operation.get(0),
467 biggest.intValue(), (int) maxCommandSize);
468 }
469 }
470
471 // Now build commands packing the operations into a few messages as
472 // possible.
473 final List<Bundle> commands = new ArrayList<Bundle>();
474 final List<WriteOperation> bundled = new ArrayList<WriteOperation>(
475 Math.min(maxOperationsPerBundle, myWrites.size()));
476 final DocumentBuilder command = BuilderFactory.start();
477 for (final Map.Entry<WriteOperationType, SortedMap<Long, List<WriteOperation>>> entry : operationsBuckets
478 .entrySet()) {
479 final SortedMap<Long, List<WriteOperation>> operations = entry
480 .getValue();
481 while (!operations.isEmpty()) {
482 final ArrayBuilder docs = start(entry.getKey(), collectionName,
483 false, command);
484 long remaining = maxCommandSize - command.build().size();
485
486 SortedMap<Long, List<WriteOperation>> head = operations;
487 int index = 0;
488 while (!head.isEmpty()
489 && (bundled.size() < maxOperationsPerBundle)) {
490 final Long biggest = head.lastKey();
491 final List<WriteOperation> bigOps = head.get(biggest);
492 final WriteOperation operation = bigOps.remove(0);
493 if (bigOps.isEmpty()) {
494 head.remove(biggest);
495 }
496
497 add(docs, operation);
498 bundled.add(operation);
499
500 remaining -= sizeOf(index, operation);
501 index += 1;
502 head = operations.headMap(Long.valueOf(remaining
503 - sizeOfIndex(index)));
504 }
505
506 commands.add(new Bundle(command.build(), bundled));
507 bundled.clear();
508 }
509 }
510
511 return commands;
512 }
513
514 /**
515 * Creates write commands for each sequence of insert, updates and deletes.
516 * <p>
517 * <b>Note</b>: MongoDB gives a slightly larger document for the command (<a
518 * href=
519 * "https://github.com/mongodb/mongo/blob/master/src/mongo/bson/util/builder.h#L56"
520 * >16K</a>). This is for the command overhead. We don't explicitly use the
521 * overhead but we may end up using it in the case of a operation that is
522 * just at or below maxCommandSize. That is why we start the 'head' map
523 * below with the full map. That allows those big operations to be added to
524 * commands of there own once the command overhead has been factored in.
525 * </p>
526 *
527 * @param collectionName
528 * The name of the collection the documents will be inserted
529 * into.
530 * @param maxCommandSize
531 * The maximum document size.
532 * @param stopOnError
533 * If true then the ordered flag is set to true.
534 * @param maxOperationsPerBundle
535 * The maximum number of writes to include in each bundle.
536 * @return The list of command documents to be sent.
537 */
538 private List<Bundle> createSerialized(final String collectionName,
539 final long maxCommandSize, final int maxOperationsPerBundle,
540 final boolean stopOnError) {
541 final List<Bundle> commands = new ArrayList<Bundle>();
542 final DocumentBuilder command = BuilderFactory.start();
543
544 final List<WriteOperation> toSend = getWrites();
545 final List<WriteOperation> bundled = new ArrayList<WriteOperation>(
546 Math.min(maxOperationsPerBundle, myWrites.size()));
547
548 ArrayBuilder opsArray = null;
549 WriteOperationType lastType = null;
550
551 long remaining = maxCommandSize;
552 for (final WriteOperation writeOp : toSend) {
553 long size = sizeOf(-1, writeOp);
554 final long indexSize = sizeOfIndex(bundled.size());
555 if (maxCommandSize < size) {
556 throw createDocumentToLargeException(writeOp, (int) size,
557 (int) maxCommandSize);
558 }
559 size += indexSize; // Add in the index overhead.
560
561 // Close a command if change type or too big.
562 if (!bundled.isEmpty()
563 && ((lastType != writeOp.getType())
564 || ((remaining - size) < 0) || (maxOperationsPerBundle <= bundled
565 .size()))) {
566 commands.add(new Bundle(command.build(), bundled));
567 bundled.clear();
568 }
569
570 // Start a command? - Maybe after closing?
571 if (bundled.isEmpty()) {
572 opsArray = start(writeOp.getType(), collectionName,
573 stopOnError, command);
574 lastType = writeOp.getType();
575 remaining = (maxCommandSize - command.build().size());
576 }
577
578 // Add the operation.
579 add(opsArray, writeOp);
580 bundled.add(writeOp);
581
582 // Remove the size of the operation from the remaining.
583 remaining -= size;
584 }
585
586 if (!bundled.isEmpty()) {
587 commands.add(new Bundle(command.build(), bundled));
588 }
589
590 return commands;
591 }
592
593 /**
594 * Returns the size of the encoded operation.
595 * <p>
596 * For an {@code InsertOperation} this is the size of the document to
597 * insert.
598 * <p>
599 * For an {@code UpdateOperation} this includes the space for:
600 * <dl>
601 * <dt>Document Overhead</dt>
602 * <dd>type (1 byte), length (4 bytes), trailing null (1 byte).</dd>
603 * <dt>'q' field</dt>
604 * <dd>name (2 bytes), type (1 byte), value (document size)</dd>
605 * <dt>'u' field</dt>
606 * <dd>name (2 bytes), type (1 byte), value (document size)</dd>
607 * <dt>'upsert' field</dt>
608 * <dd>name (7 bytes), type (1 byte), value (1 byte)</dd>
609 * <dt>'multi' field</dt>
610 * <dd>name (6 bytes), type (1 byte), value (1 byte)</dd>
611 * </dl>
612 * </p>
613 * <p>
614 * For a {@code DeleteOperation} this includes the space for:
615 * <dl>
616 * <dt>Document Overhead</dt>
617 * <dd>type (1 byte), length (4 bytes), trailing null (1 byte).</dd>
618 * <dt>'q' field</dt>
619 * <dd>name (2 bytes), type (1 byte), value (document size)</dd>
620 * <dt>'limit' field</dt>
621 * <dd>name (6 bytes), type (1 byte), value (4 bytes)</dd>
622 * </dl>
623 *
624 * @param index
625 * The index of the operation in the operations array.
626 * @param operation
627 * The operation to determine the size of.
628 * @return The size of the operation.
629 */
630 private long sizeOf(final int index, final WriteOperation operation) {
631 long result = 0;
632 switch (operation.getType()) {
633 case INSERT: {
634 final InsertOperation insertOperation = (InsertOperation) operation;
635 result = sizeOfIndex(index) + insertOperation.getDocument().size();
636 break;
637 }
638 case UPDATE: {
639 final UpdateOperation updateOperation = (UpdateOperation) operation;
640 result = sizeOfIndex(index) + updateOperation.getQuery().size()
641 + updateOperation.getUpdate().size() + 29;
642 break;
643 }
644 case DELETE: {
645 final DeleteOperation deleteOperation = (DeleteOperation) operation;
646 result = sizeOfIndex(index) + deleteOperation.getQuery().size()
647 + 20;
648 break;
649 }
650 }
651
652 return result;
653 }
654
655 /**
656 * Returns the number of bytes required to encode the index within the array
657 * element.
658 *
659 * @param index
660 * The index to return the size of.
661 * @return The length of the encoded index.
662 */
663 private long sizeOfIndex(final int index) {
664 // For 2.6 the number of items in the array is capped at 1000. This
665 // allows up to 99,999 without resorting to turning the value into
666 // a string which seems like safe enough padding.
667 if (index < 0) {
668 return 0; // For estimating operation sizes.
669 }
670 else if (index < 10) {
671 return 3; // single character plus a null plus a type.
672 }
673 else if (index < 100) {
674 return 4; // two characters plus a null plus a type.
675 }
676 else if (index < 1000) {
677 return 5; // three characters plus a null plus a type.
678 }
679 else if (index < 10000) {
680 return 6; // four characters plus a null plus a type.
681 }
682
683 return Integer.toString(index).length() + 2;
684 }
685
686 /**
687 * Starts a new command document.
688 *
689 * @param operation
690 * The operation to start.
691 * @param collectionName
692 * The collection to operate on.
693 * @param stopOnError
694 * If true then the operations should stop once an error is
695 * encountered. Is mapped to the {@code ordered} field in the
696 * command document.
697 * @param command
698 * The command builder.
699 * @return The {@link ArrayBuilder} for the operations array.
700 */
701 private ArrayBuilder start(final WriteOperationType operation,
702 final String collectionName, final boolean stopOnError,
703 final DocumentBuilder command) {
704
705 String commandName = "";
706 String arrayName = "";
707 switch (operation) {
708 case INSERT: {
709 commandName = "insert";
710 arrayName = "documents";
711 break;
712 }
713 case UPDATE: {
714 commandName = "update";
715 arrayName = "updates";
716 break;
717 }
718 case DELETE: {
719 commandName = "delete";
720 arrayName = "deletes";
721 break;
722 }
723 }
724
725 command.reset();
726 command.add(commandName, collectionName);
727 if (!stopOnError) {
728 command.add("ordered", stopOnError);
729 }
730 addDurability(command, getDurability());
731
732 return command.pushArray(arrayName);
733 }
734
735 /**
736 * Builder for creating {@link BatchedWrite}s.
737 *
738 * @api.yes This class is part of the driver's API. Public and protected
739 * members will be deprecated for at least 1 non-bugfix release
740 * (version numbers are <major>.<minor>.<bugfix>)
741 * before being removed or modified.
742 * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
743 */
744 public static class Builder {
745
746 /** The durability for the writes. */
747 protected Durability myDurability;
748
749 /** The mode for submitting the writes to the server. */
750 protected BatchedWriteMode myMode;
751
752 /** The writes to submit to the server. */
753 protected final List<WriteOperation> myWrites;
754
755 /**
756 * Creates a new Builder.
757 */
758 public Builder() {
759 myWrites = new ArrayList<WriteOperation>();
760
761 reset();
762 }
763
764 /**
765 * Constructs a new {@link BatchedWrite} object from the state of the
766 * builder.
767 *
768 * @return The new {@link BatchedWrite} object.
769 */
770 public BatchedWrite build() {
771 return new BatchedWrite(this);
772 }
773
774 /**
775 * Update a document based on a query.
776 * <p>
777 * Defaults to deleting as many documents as match the query.
778 * </p>
779 * <p>
780 * This method is delegates to
781 * {@link #delete(DocumentAssignable, boolean) delete(query, false)}
782 * </p>
783 *
784 * @param query
785 * The query to find the document to delete.
786 * @return This builder for chaining method calls.
787 */
788 public Builder delete(final DocumentAssignable query) {
789 return delete(query, false);
790 }
791
792 /**
793 * Update a document based on a query.
794 * <p>
795 * Defaults to deleting as many documents as match the query.
796 * </p>
797 *
798 * @param query
799 * The query to find the document to delete.
800 * @param singleDelete
801 * If true then only a single document will be deleted. If
802 * running in a sharded environment then this field must be
803 * false or the query must contain the shard key.
804 * @return This builder for chaining method calls.
805 */
806 public Builder delete(final DocumentAssignable query,
807 final boolean singleDelete) {
808 return write(new DeleteOperation(query, singleDelete));
809 }
810
811 /**
812 * Sets the durability for the writes.
813 * <p>
814 * This method delegates to {@link #setDurability(Durability)}.
815 * </p>
816 *
817 * @param durability
818 * The new value for the durability for the writes.
819 * @return This builder for chaining method calls.
820 */
821 public Builder durability(final Durability durability) {
822 return setDurability(durability);
823 }
824
825 /**
826 * Returns the durability for the write.
827 *
828 * @return This durability for the write.
829 */
830 public Durability getDurability() {
831 return myDurability;
832 }
833
834 /**
835 * Adds an insert operation to the batched write.
836 *
837 * @param document
838 * The document to insert.
839 * @return This builder for chaining method calls.
840 */
841 public Builder insert(final DocumentAssignable document) {
842 return write(new InsertOperation(document));
843 }
844
845 /**
846 * Sets the mode for submitting the writes to the server.
847 * <p>
848 * This method delegates to {@link #setMode(BatchedWriteMode)}.
849 * </p>
850 *
851 * @param mode
852 * The new value for the mode for submitting the writes to
853 * the server.
854 * @return This builder for chaining method calls.
855 */
856 public Builder mode(final BatchedWriteMode mode) {
857 return setMode(mode);
858 }
859
860 /**
861 * Resets the builder back to its initial state for reuse.
862 *
863 * @return This builder for chaining method calls.
864 */
865 public Builder reset() {
866 myWrites.clear();
867 myMode = BatchedWriteMode.SERIALIZE_AND_CONTINUE;
868 myDurability = null;
869
870 return this;
871 }
872
873 /**
874 * Saves the {@code document} to MongoDB.
875 * <p>
876 * If the {@code document} does not contain an {@code _id} field then
877 * this method is equivalent to: {@link #insert(DocumentAssignable)
878 * insert(document)}.
879 * </p>
880 * <p>
881 * If the {@code document} does contain an {@code _id} field then this
882 * method is equivalent to:
883 * {@link #update(DocumentAssignable, DocumentAssignable)
884 * updateAsync(BuilderFactory.start().add(document.get("_id")),
885 * document, false, true)}.
886 * </p>
887 *
888 * @param document
889 * The document to save.
890 * @return This builder for chaining method calls.
891 */
892 public Builder save(final DocumentAssignable document) {
893 final Document doc = document.asDocument();
894 final Element id = doc.get("_id");
895 if (id == null) {
896 return insert(doc);
897 }
898 return update(BuilderFactory.start().add(id), doc, false, true);
899 }
900
901 /**
902 * Sets the durability for the writes.
903 *
904 * @param durability
905 * The new value for the durability for the writes.
906 * @return This builder for chaining method calls.
907 */
908 public Builder setDurability(final Durability durability) {
909 myDurability = durability;
910 return this;
911 }
912
913 /**
914 * Sets the mode for submitting the writes to the server.
915 *
916 * @param mode
917 * The new value for the mode for submitting the writes to
918 * the server.
919 * @return This builder for chaining method calls.
920 */
921 public Builder setMode(final BatchedWriteMode mode) {
922 myMode = mode;
923 return this;
924 }
925
926 /**
927 * Sets the writes to submit to the server.
928 *
929 * @param writes
930 * The new value for the writes to submit to the server.
931 * @return This builder for chaining method calls.
932 */
933 public Builder setWrites(final List<WriteOperation> writes) {
934 myWrites.clear();
935 if (writes != null) {
936 myWrites.addAll(writes);
937 }
938 return this;
939 }
940
941 /**
942 * Update a document based on a query.
943 * <p>
944 * Defaults to updating a single document and not performing an upsert
945 * if no document is found.
946 * </p>
947 * <p>
948 * This method is delegates to
949 * {@link #update(DocumentAssignable, DocumentAssignable, boolean, boolean)
950 * update(query, update, false, false)}
951 * </p>
952 *
953 * @param query
954 * The query to find the document to update.
955 * @param update
956 * The update operations to apply to the document.
957 * @return This builder for chaining method calls.
958 */
959 public Builder update(final DocumentAssignable query,
960 final DocumentAssignable update) {
961 return update(query, update, false, false);
962 }
963
964 /**
965 * Update a document based on a query.
966 * <p>
967 * Defaults to updating a single document and not performing an upsert
968 * if no document is found.
969 * </p>
970 *
971 * @param query
972 * The query to find the document to update.
973 * @param update
974 * The update operations to apply to the document.
975 * @param multiUpdate
976 * If true then the update is applied to all of the matching
977 * documents, otherwise only the first document found is
978 * updated.
979 * @param upsert
980 * If true then if no document is found then a new document
981 * is created and updated, otherwise no operation is
982 * performed.
983 * @return This builder for chaining method calls.
984 */
985 public Builder update(final DocumentAssignable query,
986 final DocumentAssignable update, final boolean multiUpdate,
987 final boolean upsert) {
988 return write(new UpdateOperation(query, update, multiUpdate, upsert));
989 }
990
991 /**
992 * Adds a single write to the list of writes to send to the server.
993 *
994 * @param write
995 * The write to add to the list of writes to send to the
996 * server.
997 * @return This builder for chaining method calls.
998 */
999 public Builder write(final WriteOperation write) {
1000 myWrites.add(write);
1001 return this;
1002 }
1003
1004 /**
1005 * Sets the writes to submit to the server.
1006 * <p>
1007 * This method delegates to {@link #setWrites(List)}.
1008 * </p>
1009 *
1010 * @param writes
1011 * The new value for the writes to submit to the server.
1012 * @return This builder for chaining method calls.
1013 */
1014 public Builder writes(final List<WriteOperation> writes) {
1015 return setWrites(writes);
1016 }
1017 }
1018
1019 /**
1020 * Bundle is a container for the write command and the
1021 * {@link WriteOperation} it contains.
1022 *
1023 * @api.yes This class is part of the driver's API. Public and protected
1024 * members will be deprecated for at least 1 non-bugfix release
1025 * (version numbers are <major>.<minor>.<bugfix>)
1026 * before being removed or modified.
1027 */
1028 public static final class Bundle {
1029 /** The command containing the bundled write operations. */
1030 private final Document myCommand;
1031
1032 /** The writes that are bundled in the command. */
1033 private final List<WriteOperation> myWrites;
1034
1035 /**
1036 * Creates a new Bundle.
1037 *
1038 * @param command
1039 * The command containing the bundled write operations.
1040 * @param writes
1041 * The writes that are bundled in the command.
1042 */
1043 protected Bundle(final Document command,
1044 final List<WriteOperation> writes) {
1045 super();
1046 myCommand = command;
1047 myWrites = Collections
1048 .unmodifiableList(new ArrayList<WriteOperation>(writes));
1049 }
1050
1051 /**
1052 * Returns the command containing the bundled write operations.
1053 *
1054 * @return The command containing the bundled write operations.
1055 */
1056 public Document getCommand() {
1057 return myCommand;
1058 }
1059
1060 /**
1061 * Returns the writes that are bundled in the command.
1062 *
1063 * @return The writes that are bundled in the command.
1064 */
1065 public List<WriteOperation> getWrites() {
1066 return myWrites;
1067 }
1068 }
1069 }