MongoDB 2.6 added the ability to batch inserts, updates, and deletes using three new write commands. The driver provides support for these three commands via a unified BatchedWrite class. The BatchedWrite class has an inner BatchedWrite.Builder class that is used to construct the BatchedWrite as a series of insert, update and delete operations.
MongoCollection collection = ...; BatchedWrite.Builder writes = new BatchedWrite.builder(); DocumentBuilder builder = BuilderFactory.start(); for( int i = 0; i < 1000; ++i ) { writes.insert( builder.reset().add("i", i) ); } DocumentBuilder update = BuilderFactory.start(); update.push("$set").add("touched", true); for( int i = 0; i < 1000; ++i ) { writes.update( builder.reset().add("i", i), update ); } writes.delete( builder.reset().add("touched", true), false ); long modified = collection.write( writes ); // Should be 3000.
The BatchedWrite contains a BatchedWriteMode that controls the level of reordering and pipelining the driver can perform. By default the driver uses the SERIALIZE_AND_CONTINUE mode which allows the driver to pack the operations in the order provided but it does not have to wait for the results of one batch before executing the next. See the BatchedWriteMode for other options. To maximize performance use the REORDER mode that allows the driver to reorder operations to maximize the packing of operations in the commands. Care must be take to ensure that the operations are independent when using this mode.
BatchedWrite may still be used with pre-2.6 MongoDB servers/clusters. The driver will automatically switch to using the pre-2.6 insert/update/delete operations.
While the BatchedWrite object is useful it is limited to only insert, update, and delete operations. There are many cases where the user knows the full sequence of operations to be applied and would like to submit them to the server as unit. In order to achieve a generic batching capability we will need some way of deferring the results of each operation. Luckly, this is already solved for the driver via ListenableFutures, Callback, and LambdaCallback. We simply need a way to spool up the operations and a way to indicate when they should be flushed to the server. That is the role of the BatchedAsyncMongoCollection interface.
We get an instance of the BatchedAsyncMongoCollection from a MongoCollection. Once we have the batched collection we can invoke any of the asynchronous operations we want. The driver will not send any of them to the server and instead simply remember the operations until flush() or close() is called. (BatchedAsyncMongoCollection extends Closeable to support try-with-finally blocks.) Once the batch is flushed all of the commands are sent to the server. Any sequential inserts, updates, and deletes will also be migrated to using the batch write commands, if the server supports them. There are some important limitations for updates and deletes that are explained in the interface JavaDoc.
We have two demo applications that show the flexibility of the BatchedAsyncMongoCollection interface. Both are located in the mongodb-async-examples GitHub repository. The first demo uses ListenableFutures to capture the results. The second is the same but uses Java 8 Lamba Expressions. Below is an excerpt from the full Java 8 example:
// Batching requests is accomplished via the BatchedAsyncMongoCollection // interface which we get from the startBatch() method. The batch needs // to always be closed to submit the requests so we use a // try-with-resources. final CountDownLatch latch = new CountDownLatch(1); try (BatchedAsyncMongoCollection batch = theCollection.startBatch()) { // Now we can do as many CRUD operations we want. Even commands like // are supported. // We need some data. Lets create a documents with the _id field 'a' // thru 'z'. final DocumentBuilder builder = BuilderFactory.start(); for (char c = 'a'; c <= 'z'; ++c) { builder.reset().add("_id", String.valueOf(c)); // Lambda is called once the batch completes. batch.insertAsync((e, count) -> { }, builder); } // A query works. final Find.Builder find = Find.builder(); find.query(where("_id").equals("a")); batch.findOneAsync((e, found) -> { System.out.println("Find 'a': "); System.out.println(" " + found); }, find); // An update too. final DocumentBuilder updateDoc = BuilderFactory.start(); updateDoc.push("$set").add("marked", true); batch.updateAsync((e, updated) -> { System.out.println("Update all of the documents: " + updated); }, Find.ALL, updateDoc, true, false); // Delete should work. batch.deleteAsync((e, deleted) -> { System.out.println("Delete 'b': " + deleted); }, where("_id").equals("b")); // Commands... It is all there. batch.countAsync((e, count) -> { System.out.println("Count all documents: " + count); }, Find.ALL); // Lets look at the 'a' doc one more time. It should have the // "marked" field now. batch.findOneAsync((e, found) -> { System.out.println("Find 'a' after the update: "); System.out.println(" " + found); latch.countDown(); }, find); // At this point nothing has been sent to the server. All of the // messages have been "spooled" waiting to be sent. // All of the messages will use the same connection // (unless a read preference directs a query to a different // server). } // Send the batch. /** * Should produce output like: * * <pre> * <code> * Find 'a': * { '_id' : 'a' } * Update all of the documents: 26 * Delete 'b': 1 * Count all documents: 25 * Find 'a' after the update: * { * '_id' : 'a', * marked : true * } * </code> * </pre> */ latch.await();