Using the Asynchronous API

Using the driver's asynchronous interfaces causes some developers apprehension and doubt on how best to proceed. This guide is intended to lower the barrier for using the asynchronous APIs and provide guidance on best practices.

Getting Started with the Asynchronous API

The best way to get started with the asynchronous API is to not use the API at all. Simply porting your application to the driver may provide enough of a performance boost without having to use the asynchronous API directly. Once you have ported your application you will also have a better understanding of what parts of your application will benefit the most.

Assuming you have already ported the application and still need more performance: Choose a single processing flow that is not meeting its performance objectives, or is causing the application to stall waiting for responses from MongoDB. When looking at various flows choose the most common or worst performing flow. Re-write that flow using the suggestions below and then reevaluate the applications performance. If its performance is still not satisfactory, update another flow, reevaluate, and repeat until the application is performing as needed.

Asynchronous Options

The asynchronous API is broken into two flavors. One is based on Futures and the other is based on Callbacks. We will cover futures first and then the callback versions but before we get into the details it is important to realize that mixing and matching the asynchronous APIs is perfectly acceptable and even encouraged. It is even possible to mix the asynchronous APIs with the traditional methods in high performance applications, if done carefully. More on that later.

Asynchronous, lock free processing is hard. There is no doubt that in order to achieve optimal performance the developer needs to think about how to best achieve the desired result. The good news is that when done right the improvements in performance can be extreme.

There is one feature of the driver that is often over looked but can help immensely with asynchronous programming. The MongoClient interface has a method: asSerializedClient(). The method returns what looks like another normal instance of a MongoClient. It has one very special property. All of the requests to the server that are derived from that MongoClient will be sent via a single logical connection to the cluster and so will be serialized and processed in the same order on the server as they are issued on the client. Note that ReadPreferences and sharding can change the server used but all requests to a single server will be processed in the same order as the client issues them. We have intentionally made the asSerializedClient() and MongoDatabase and MongoCollection objects as lightweight as possible to encourage their use. There are many problems that are extremely difficult to solve without the asSerializedClient() guarantees.

Future Based API

The Future based API is the simplest asynchronous API to use. The method will return an appropriately typed Future to the caller. To retrieve the value, simply call get() on the Future returned. Consider the following code:

MongoCollection collection = ....;

// Trivial Future usage.  Not a good use of Futures.
Future<Long> future = collection.countAsync();

System.out.printf( "There are %,d documents in the collection.%n", future.get() );

A couple of items to point out:

  • The countAsync() method, in all likelihood, returned to the caller before the request was even received by the MongoDB servers.
  • The Future.get() method blocks the caller until the response is received from the server.

Under the hood the driver actually uses the Future interface to implement the synchronous interface. Given that, using the Future like we did above will provide little to no benefit over using the synchronous interface. The question is, when can the Future interface provide benefit? Generally that can be summarized as:

  • Use the Future API when you can significantly delay the need to retrieve the value from the Future.

What does that mean? As a way of explaining let me give a few examples.

Examples Using the Future API.

Using a Pending Results Queue

For the first example, lets consider the case of needing to process the contents of a file. Each line in the file needs to update a different document within MongoDB but we only really care that each update is eventually applied and we want to process the file as quickly as possible. Here is what the synchronous version of the logic might look like:

String line;
MongoCollection collection = null;

BufferedReader reader = new BufferedReader(new StringReader("f"));
while( (line = reader.readLine()) != null )  {
    Document query = queryForLine(line);
    Document update = updateForLine(line);
    
    long result = collection.update(query, update);
    processResult(result);
}

There are a number of ways to approach this using futures but probably the easiest is to use a BlockingQueue. After submitting the request to MongoDB we will add the Future to the queue. When the queue is full we process a single Future before processing the next line (and adding its Future) to the queue. In this way we can interleave our processing of the file with the processing of the replies from the server. Using the queue of pending results might look like:

String line;
MongoCollection collection = null;
BlockingQueue<Future<Long>> pendingResults = new ArrayBlockingQueue<Future<Long>>(1000);

BufferedReader reader = new BufferedReader(new StringReader("f"));
while ((line = reader.readLine()) != null) {
    Document query = queryForLine(line);
    Document update = updateForLine(line);

    Future<Long> result = collection.updateAsync(query, update);

    while (!pendingResults.offer(result)) {
        Future<Long> delayed = pendingResults.take();

        processResult(delayed);
    }
}

Future<Long> delayed;
while ((delayed = pendingResults.poll()) != null) {
    processResult(delayed);
}

We have introduced two more loops. The first processes the results when the queue gets full while we are iterating over the lines in the file. The second processes all of the remaining results once we have finished processing the file. Admittedly, the code is a little more complex, but depending on the complexity of the processing required for the file and for the updates on MongoDB, it should run close to optimal.

Batching Requests / Delayed Evaluation

The other reason for using Futures is when the client has either a number of requests or a request can be sent to MongoDB and a client side task (that takes some non-trivial amount of time) is done before the response is evaluated. This approach, and what is a non-trivial amount of time, is application specific. Generally, the longer you defer evaluating the future the better. We will return to this concept as part of the section on mixing Futures and Callbacks.

A good example of the delayed evaluation is within the driver itself. The implementation of the MongoIterator sends a request for the next batch of documents when a user starts consuming a new batch. The Future for the reply is held in the iterator until the user finishes consuming the previous batch. Given any non-trivial processing of the documents, and a little luck, the reply for a batch will be ready before the previous batch is exhausted and the client will not have to pause waiting for more results from the server.

When You Just Don't Care

The last case we will look at for Futures is when the response from the server does not matter. It may seem strange to say but once you start fully embracing an asynchronous programming style there will be many cases where you don't care what the result of individual requests are. There will be specific points in the processing where you will need to know the current state of a document. Between those points will be a series of updates and deletes that are performed by pipelining with a serialized client. They will normally end with a query or FindAndModify to see the results of all of the previous operations. Using the Future interface and Durability.NONE will optimize the processing of those intermediate updates and deletes.

Error Handling with the Future API.

Any error encountered while processing the request will be thrown when the get() method of the future is called. The exception is wrapped in an ExecutionException. The original exception can be retrieved from the ExecutionException via the ExecutionException.getCause() method.

Callback Based API

Using the Callback asynchronous API requires the most work on the developer's part but if done carefully can maintain the clarity of the code and also ensure maximum performance. To use the Callback API the developer needs to implement and instantiate a specialized hander for the results of each unique use case to the MongoDB server. The class will extend either the Callback or LambdaCallback interface. The Callback interface defines two methods: Callback.callback(T) and Callback.exception(Throwable). The LambdaCallback interface was added to support Java 8 Lambda methods and defines a single method: LambdaCallback.accept(Throwable,V). Initial implementations of these interfaces are commonly created as anonymous inner classes or lambda expressions.

When switching to the Callback interface, if there are any callbacks that are not lightweight then it is highly recommended that an Executor be provided in the MongoClientConfiguration. This enabled the Callback handling to be off-loaded from the driver's receive thread.

The main benefit of the Callback API is that it can remove all need to wait for data. All requests are sent to the server and when the response is ready, it is processed immediately.

Example Using the Callback API.

Lets return to our File processing example. Using the Callback interface is a fairly minor change to the original processing. Using the anonymous inner class obfuscates the following example but the intent is still fairly clear.

String line;
MongoCollection collection = null;

BufferedReader reader = new BufferedReader(new StringReader("f"));
while ((line = reader.readLine()) != null) {
    Document query = queryForLine(line);
    Document update = updateForLine(line);

    collection.updateAsync(new Callback<Long>() {
        public void callback(Long result) {
            processResult(result);
        }
        public void exception(Throwable thrown) {
            // Handle error.
        }
    }, query, update);
}

For a LambdaCallback using a Java 8 lambda expression this becomes even more succinct.

String line;
MongoCollection collection = null;

BufferedReader reader = new BufferedReader(new StringReader("f"));
while ((line = reader.readLine()) != null) {
    Document query = queryForLine(line);
    Document update = updateForLine(line);

    collection.updateAsync((e, result) -> {
            if( e ) {
                // Handle error.
            } else {
                processResult(result)
            }
        }, query, update);
}

The anonymous inner class can be moved to a nested or top level class but then the link between the result of one request leading to the next stage of processing is lost. To recover the codes flow we have successfully used the following strategy:

  • For each processing flow define the series of stages that must be completed. Each stage ends with an asynchronous request to MongoDB.
  • Create a series of methods in the class responsible for the processing named something like: <flow_name><stage_name>(...). The arguments to each stage are the results of the previous stage's asynchronous request to MongoDB and any other state that needs to be carried forward. If your project uses automated code reformatting and sorting name the stages with a sortable prefix, e.g., flowName1Prepare, flowName2Compose, etc.
  • Create simple nested and/or top level classes to act as the callback and bridge from one stage to the next. An instance of the callback will be used with the asynchronous request at the end of one stage and its callback(...) method will simply invoke the method for the next stage.

The combination of the method naming and ordering clearly communicates the processing flow to future developers. By keeping all logic out of the Callbacks we ensure they remain clear and concise bridge components.

Error Handling with the Callback API.

Any errors encountered while processing the request will be explicitly given to the Callback via the exception(Throwable) method. One of the virtues of the Callback method or handling asynchronous requests is that it forces the developer to explicitly deal with failures. A common mechanism is to retry the request.

Retries with Callbacks

The following base class can be used to intelligently retry requests to the MongoDB server. The logic for actually retrying the request is left to the derived class via a retry(Throwable).

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.allanbank.mongodb.Callback;

/**
 * AbstractRetryCallbackHandler provides a base class for {@link Callback}
 * instances that will retry the operation.
 * 
 * @param <V>
 *            The type of the callback's result.
 */
public abstract class AbstractRetryCallbackHandler<V> implements Callback<V> {

    /** The maximum amount of time to pause between retries. */
    public static final long MAX_PAUSE_MS = TimeUnit.MINUTES.toMillis(1);

    /** The executor for retries. */
    private final ScheduledExecutorService myExecutor;

    /** The maximum number of times to retry. */
    private final int myMaximumRetries;

    /** The number of milliseconds to wait before attempting another retry. */
    private long myNextPauseMS;

    /** The number of times we have attemtped to retry. */
    private int myRetries;

    /**
     * Creates a new AbstractRetryCallbackHandler.
     * 
     * @param retries
     *            The number of times we have attemtped to retry.
     * @param executor
     *            he executor for retries.
     */
    public AbstractRetryCallbackHandler(final int retries,
            final ScheduledExecutorService executor) {
        myMaximumRetries = retries;
        myExecutor = executor;
        myRetries = 0;
        myNextPauseMS = 1;
    }

    /**
     * Called when all of the retry attempts have been exhausted.
     * 
     * @param thrown
     *            The error causing the retry.
     */
    public abstract void dead(final Throwable thrown);

    /**
     * {@inheritDoc}
     * <p>
     * Overridden to retry the request after a suitable delay.
     * </p>
     */
    @Override
    public void exception(final Throwable thrown) {
        myRetries += 1;
        if (myRetries < myMaximumRetries) {

            myExecutor.schedule(new Runnable() {

                @Override
                public void run() {
                    retry(thrown);  
                    
                }
            }, myNextPauseMS, TimeUnit.MILLISECONDS);

            myNextPauseMS = Math.min(myNextPauseMS * 2, MAX_PAUSE_MS);
        }
        else {
            dead(thrown);
        }
    }

    /**
     * Called to attempt a retry.
     * 
     * @param thrown
     *            The error causing the retry.
     */
    public abstract void retry(final Throwable thrown);
}

Note: It is not always possible to simply repeat requests to the MongoDB server. Under some circumstances it is possible to use a findAndModify command (or two) to look for the case that the initial request was successful (and potentially, also the case it was not successful) and return the appropriate results.

Mixing Futures and Callbacks

Mixing Futures and Callbacks is most advantageous in situations where there is a need to collect the results of multiple MongoDB requests. By delaying the processing of any Future until the invocation of a Callback we achieve the goal of delaying evaluation. We can also be reasonably certain that, if not already, the future will block for a significantly reduced amount of time. This can sometimes greatly reduce the complexity of using asynchronous problems. By using the serialized client discussed above we can ensure that the Futures do not block.

Mixing Synchronous use with Asynchronous Usage.

In general, you do not want to mix the asynchronous and synchronous programming models. There are a couple caveats to that statement.

  • Using synchronous requests to start asynchronous processing flows - In this case there is, for example, a long running query that is started and each document that is returned is then used to start an asynchronous processing flow. The initial query, in this case, is not gating the performance of the system as an asynchronous or synchronous process takes similar amounts of time to fetch all of the documents.
  • Using synchronous processing for periodic housekeeping - The canonical example is doing batch updates or deletes of documents from a collection. Again, the batch update is not on the performance critical aspect of the application.

The above should not be viewed to infer that using the asynchronous interface is an all or nothing proposition for a given application. As stated above, migration of individual flows until the application performance is to the desired level should be driving the conversion process. Within a flow, mixing asynchronous and synchronous requests is discouraged.