Coverage Report - com.allanbank.mongodb.client.connection.rs.ReplicaSetReconnectStrategy
 
Classes in this File Line Coverage Branch Coverage Complexity
ReplicaSetReconnectStrategy
84%
97/115
70%
35/50
5
ReplicaSetReconnectStrategy$1
100%
1/1
N/A
5
 
 1  
 /*
 2  
  * #%L
 3  
  * ReplicaSetReconnectStrategy.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 
 21  
 package com.allanbank.mongodb.client.connection.rs;
 22  
 
 23  
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 24  
 
 25  
 import java.io.IOException;
 26  
 import java.util.Collections;
 27  
 import java.util.HashMap;
 28  
 import java.util.List;
 29  
 import java.util.Map;
 30  
 import java.util.Set;
 31  
 import java.util.concurrent.ConcurrentHashMap;
 32  
 import java.util.concurrent.ExecutionException;
 33  
 import java.util.concurrent.Future;
 34  
 import java.util.concurrent.TimeUnit;
 35  
 import java.util.concurrent.TimeoutException;
 36  
 import java.util.logging.Level;
 37  
 
 38  
 import com.allanbank.mongodb.MongoClientConfiguration;
 39  
 import com.allanbank.mongodb.bson.Document;
 40  
 import com.allanbank.mongodb.bson.Element;
 41  
 import com.allanbank.mongodb.bson.element.StringElement;
 42  
 import com.allanbank.mongodb.client.connection.Connection;
 43  
 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
 44  
 import com.allanbank.mongodb.client.connection.proxy.ConnectionInfo;
 45  
 import com.allanbank.mongodb.client.message.IsMaster;
 46  
 import com.allanbank.mongodb.client.message.Reply;
 47  
 import com.allanbank.mongodb.client.state.AbstractReconnectStrategy;
 48  
 import com.allanbank.mongodb.client.state.Cluster;
 49  
 import com.allanbank.mongodb.client.state.Server;
 50  
 import com.allanbank.mongodb.client.state.ServerUpdateCallback;
 51  
 import com.allanbank.mongodb.util.IOUtils;
 52  
 import com.allanbank.mongodb.util.log.Log;
 53  
 import com.allanbank.mongodb.util.log.LogFactory;
 54  
 
 55  
 /**
 56  
  * ReplicaSetReconnectStrategy provides a {@link ReconnectStrategy} designed for
 57  
  * replica sets. The reconnect strategy attempts to locate the primary member of
 58  
  * the replica set by:
 59  
  * <ol>
 60  
  * <li>Querying each member of the replica set for the primary server.</li>
 61  
  * <li>Once a primary server has been identified by a member of the replica set
 62  
  * (the putative primary) the putative primary server is queried for the primary
 63  
  * server.</li>
 64  
  * <ol>
 65  
  * <li>If the putative primary concurs that it is the primary then the search
 66  
  * completes and the primary server's connection is used.</li>
 67  
  * <li>If the putative primary does not concur then the search continues
 68  
  * scanning each server in turn for the primary server.</li>
 69  
  * </ol>
 70  
  * 
 71  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 72  
  *         mutated in incompatible ways between any two releases of the driver.
 73  
  * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
 74  
  */
 75  0
 public class ReplicaSetReconnectStrategy extends AbstractReconnectStrategy {
 76  
 
 77  
     /**
 78  
      * The initial amount of time to pause waiting for a server to take over as
 79  
      * the primary.
 80  
      */
 81  
     public static final int INITIAL_RECONNECT_PAUSE_TIME_MS = 10;
 82  
 
 83  
     /**
 84  
      * The Maximum amount of time to pause waiting for a server to take over as
 85  
      * the primary.
 86  
      */
 87  
     public static final int MAX_RECONNECT_PAUSE_TIME_MS = 1000;
 88  
 
 89  
     /** The logger for the {@link ReplicaSetReconnectStrategy}. */
 90  1
     protected static final Log LOG = LogFactory
 91  
             .getLog(ReplicaSetReconnectStrategy.class);
 92  
 
 93  
     /** The set of servers we cannot connect to. */
 94  23
     private final Set<Server> myDeadServers = Collections
 95  
             .newSetFromMap(new ConcurrentHashMap<Server, Boolean>());
 96  
 
 97  
     /**
 98  
      * Creates a new ReplicaSetReconnectStrategy.
 99  
      */
 100  
     public ReplicaSetReconnectStrategy() {
 101  23
         super();
 102  23
     }
 103  
 
 104  
     /**
 105  
      * {@inheritDoc}
 106  
      * <p>
 107  
      * Overridden to search for the primary server in the replica set. This will
 108  
      * only continue until the
 109  
      * {@link MongoClientConfiguration#getReconnectTimeout()} has expired.
 110  
      * </p>
 111  
      */
 112  
     @Override
 113  
     public ReplicaSetConnection reconnect(final Connection oldConnection) {
 114  4
         final ConnectionInfo<Server> info = reconnectPrimary();
 115  4
         if (info != null) {
 116  2
             return new ReplicaSetConnection(info.getConnection(),
 117  
                     info.getConnectionKey(), getState(),
 118  
                     getConnectionFactory(), getConfig(), this);
 119  
         }
 120  2
         return null;
 121  
     }
 122  
 
 123  
     /**
 124  
      * Overridden to search for the primary server in the replica set. This will
 125  
      * only continue until the
 126  
      * {@link MongoClientConfiguration#getReconnectTimeout()} has expired.
 127  
      * 
 128  
      * @return The information for the primary connection or null if the
 129  
      *         reconnect fails.
 130  
      */
 131  
     public synchronized ConnectionInfo<Server> reconnectPrimary() {
 132  4
         LOG.debug("Trying replica set reconnect.");
 133  4
         final Cluster state = getState();
 134  
 
 135  
         // Figure out a deadline for the reconnect.
 136  4
         final int wait = getConfig().getReconnectTimeout();
 137  4
         long now = System.currentTimeMillis();
 138  4
         final long deadline = (wait <= 0) ? Long.MAX_VALUE : (now + wait);
 139  
 
 140  4
         final Map<Server, Future<Reply>> answers = new HashMap<Server, Future<Reply>>();
 141  4
         final Map<Server, Connection> connections = new HashMap<Server, Connection>();
 142  
 
 143  
         // Clear any interrupts
 144  4
         final boolean interrupted = Thread.interrupted();
 145  
         try {
 146  
             // First try a simple reconnect.
 147  4
             for (final Server writable : state.getWritableServers()) {
 148  0
                 if (verifyPutative(answers, connections, writable, deadline)) {
 149  0
                     LOG.debug("New primary for replica set: {}.",
 150  
                             writable.getCanonicalName());
 151  0
                     return createReplicaSetConnection(connections, writable);
 152  
                 }
 153  0
             }
 154  
 
 155  
             // How much time to pause for replies and waiting for a server
 156  
             // to become primary.
 157  4
             int pauseTime = INITIAL_RECONNECT_PAUSE_TIME_MS;
 158  14
             while (now < deadline) {
 159  
                 // Ask all of the servers who they think the primary is.
 160  12
                 for (final Server server : state.getServers()) {
 161  
 
 162  36
                     sendIsPrimary(answers, connections, server, false);
 163  
 
 164  
                     // Anyone replied yet?
 165  36
                     final ConnectionInfo<Server> newConn = checkForReply(state,
 166  
                             answers, connections, deadline);
 167  36
                     if (newConn != null) {
 168  0
                         return newConn;
 169  
                     }
 170  
 
 171  
                     // Loop to the next server.
 172  36
                 }
 173  
 
 174  
                 // Wait for a beat for a reply or a server to decide to be
 175  
                 // master.
 176  12
                 sleep(pauseTime, MILLISECONDS);
 177  12
                 pauseTime = Math.min(MAX_RECONNECT_PAUSE_TIME_MS, pauseTime
 178  
                         + pauseTime);
 179  
 
 180  
                 // Check again for replies before trying to reconnect.
 181  12
                 final ConnectionInfo<Server> newConn = checkForReply(state,
 182  
                         answers, connections, deadline);
 183  12
                 if (newConn != null) {
 184  2
                     return newConn;
 185  
                 }
 186  
 
 187  10
                 now = System.currentTimeMillis();
 188  10
             }
 189  
         }
 190  
         finally {
 191  
             // Shut down the connections we created.
 192  4
             for (final Connection conn : connections.values()) {
 193  9
                 conn.shutdown(true);
 194  9
             }
 195  4
             if (interrupted) {
 196  0
                 Thread.currentThread().interrupt();
 197  
             }
 198  
         }
 199  2
         return null;
 200  
     }
 201  
 
 202  
     /**
 203  
      * Checks for a reply from a server. If one has been received then it tries
 204  
      * to confirm the primary server by asking it if it thinks it is the primary
 205  
      * server.
 206  
      * 
 207  
      * @param state
 208  
      *            The state of the cluster.
 209  
      * @param answers
 210  
      *            The pending ({@link Future}) answers from each server.
 211  
      * @param connections
 212  
      *            The connection to each server.
 213  
      * @param deadline
 214  
      *            The deadline for the reconnect attempt.
 215  
      * @return The new connection if there was a reply and that server confirmed
 216  
      *         it was the primary.
 217  
      */
 218  
     protected ConnectionInfo<Server> checkForReply(final Cluster state,
 219  
             final Map<Server, Future<Reply>> answers,
 220  
             final Map<Server, Connection> connections, final long deadline) {
 221  48
         final Map<Server, Future<Reply>> copy = new HashMap<Server, Future<Reply>>(
 222  
                 answers);
 223  48
         for (final Map.Entry<Server, Future<Reply>> entry : copy.entrySet()) {
 224  
 
 225  98
             final Server server = entry.getKey();
 226  98
             final Future<Reply> reply = entry.getValue();
 227  
 
 228  98
             if (reply.isDone()) {
 229  
                 // Remove this reply.
 230  34
                 answers.remove(server);
 231  
 
 232  
                 // Check the result.
 233  34
                 final String putative = checkReply(reply, connections, server,
 234  
                         deadline);
 235  
 
 236  
                 // Phase2 - Verify the putative server.
 237  34
                 if (putative != null) {
 238  7
                     final Server putativeServer = getState().get(putative);
 239  7
                     if (verifyPutative(answers, connections, putativeServer,
 240  
                             deadline)) {
 241  
 
 242  
                         // Phase 3 - Setup a new replica set connection to the
 243  
                         // primary and seed it with a secondary if there is a
 244  
                         // suitable server.
 245  2
                         LOG.info("New primary for replica set: {}", putative);
 246  2
                         updateUnknown(state, answers, connections);
 247  2
                         return createReplicaSetConnection(connections,
 248  
                                 putativeServer);
 249  
                     }
 250  
                 }
 251  32
             }
 252  
             else {
 253  64
                 LOG.debug("No reply yet from {}.", server);
 254  
             }
 255  96
         }
 256  
 
 257  46
         return null;
 258  
     }
 259  
 
 260  
     /**
 261  
      * Extracts who the server thinks is the primary from the reply.
 262  
      * 
 263  
      * @param replyFuture
 264  
      *            The future to get the reply from.
 265  
      * @param connections
 266  
      *            The map of connections. The connection will be closed on an
 267  
      *            error.
 268  
      * @param server
 269  
      *            The server.
 270  
      * @param deadline
 271  
      *            The deadline for the reconnect attempt.
 272  
      * @return The name of the server the reply indicates is the primary, null
 273  
      *         if there is no primary or any error.
 274  
      */
 275  
     protected String checkReply(final Future<Reply> replyFuture,
 276  
             final Map<Server, Connection> connections, final Server server,
 277  
             final long deadline) {
 278  41
         if (replyFuture != null) {
 279  
             try {
 280  41
                 final Reply reply = replyFuture.get(
 281  
                         Math.max(0, deadline - System.currentTimeMillis()),
 282  
                         TimeUnit.MILLISECONDS);
 283  
 
 284  39
                 final List<Document> results = reply.getResults();
 285  39
                 if (!results.isEmpty()) {
 286  39
                     final Document doc = results.get(0);
 287  
 
 288  
                     // Get the name of the primary server.
 289  39
                     final Element primary = doc.get("primary");
 290  39
                     if (primary instanceof StringElement) {
 291  9
                         return ((StringElement) primary).getValue();
 292  
                     }
 293  
                 }
 294  
             }
 295  0
             catch (final InterruptedException e) {
 296  
                 // Just ignore the reply.
 297  
             }
 298  2
             catch (final TimeoutException e) {
 299  
                 // Kill the associated connection.
 300  2
                 final Connection conn = connections.remove(server);
 301  2
                 IOUtils.close(conn);
 302  
             }
 303  0
             catch (final ExecutionException e) {
 304  
                 // Kill the associated connection.
 305  0
                 final Connection conn = connections.remove(server);
 306  0
                 IOUtils.close(conn);
 307  32
             }
 308  
         }
 309  32
         return null;
 310  
     }
 311  
 
 312  
     /**
 313  
      * Sends a command to the server to return what it thinks the state of the
 314  
      * cluster is. This method will not re-request the information from the
 315  
      * server if there is already an outstanding request.
 316  
      * 
 317  
      * @param answers
 318  
      *            The pending ({@link Future}) answers from each server.
 319  
      * @param connections
 320  
      *            The connection to each server.
 321  
      * @param server
 322  
      *            The server to send the request to.
 323  
      * @param isPrimary
 324  
      *            If true logs connection errors as warnings. Debug otherwise.
 325  
      * @return The future reply for the request sent to the server.
 326  
      */
 327  
     protected Future<Reply> sendIsPrimary(
 328  
             final Map<Server, Future<Reply>> answers,
 329  
             final Map<Server, Connection> connections, final Server server,
 330  
             final boolean isPrimary) {
 331  43
         Future<Reply> reply = null;
 332  
         try {
 333  
             // Locate a connection to the server.
 334  43
             Connection conn = connections.get(server);
 335  43
             if ((conn == null) || !conn.isAvailable()) {
 336  13
                 conn = getConnectionFactory().connect(server, getConfig());
 337  13
                 connections.put(server, conn);
 338  
             }
 339  
 
 340  
             // Only send to the server if there is not an outstanding
 341  
             // request.
 342  43
             reply = answers.get(server);
 343  43
             if (reply == null) {
 344  43
                 LOG.debug("Sending reconnect(rs) query to {}.",
 345  
                         server.getCanonicalName());
 346  
 
 347  43
                 final ServerUpdateCallback replyCallback = new ServerUpdateCallback(
 348  
                         server);
 349  43
                 conn.send(new IsMaster(), replyCallback);
 350  
 
 351  43
                 reply = replyCallback;
 352  43
                 answers.put(server, reply);
 353  
 
 354  43
                 myDeadServers.remove(server);
 355  
             }
 356  
         }
 357  0
         catch (final IOException e) {
 358  
             // Nothing to do for now. Log at a debug level if this is not the
 359  
             // primary. Warn if we think it is the primary (and have not warned
 360  
             // before)
 361  0
             final Level level = (isPrimary && myDeadServers.add(server)) ? Level.WARNING
 362  
                     : Level.FINE;
 363  0
             LOG.log(level, e, "Cannot create a connection to '{}'.", server);
 364  43
         }
 365  
 
 366  43
         return reply;
 367  
     }
 368  
 
 369  
     /**
 370  
      * Sleeps without throwing an exception.
 371  
      * 
 372  
      * @param sleepTime
 373  
      *            The amount of time to sleep.
 374  
      * @param units
 375  
      *            The untis for the amount of time to sleep.
 376  
      */
 377  
     protected void sleep(final int sleepTime, final TimeUnit units) {
 378  
         try {
 379  12
             units.sleep(sleepTime);
 380  
         }
 381  0
         catch (final InterruptedException e) {
 382  
             // Ignore.
 383  12
         }
 384  12
     }
 385  
 
 386  
     /**
 387  
      * Tries to verify that the suspected primary server is in fact the primary
 388  
      * server by asking it directly and synchronously.
 389  
      * 
 390  
      * @param answers
 391  
      *            The pending ({@link Future}) answers from each server.
 392  
      * @param connections
 393  
      *            The connection to each server.
 394  
      * @param putativePrimary
 395  
      *            The server we think is the primary.
 396  
      * @param deadline
 397  
      *            The deadline for the reconnect attempt.
 398  
      * @return True if the server concurs that it is the primary.
 399  
      */
 400  
     protected boolean verifyPutative(final Map<Server, Future<Reply>> answers,
 401  
             final Map<Server, Connection> connections,
 402  
             final Server putativePrimary, final long deadline) {
 403  
 
 404  7
         LOG.debug("Verify putative server ({}) on reconnect(rs).",
 405  
                 putativePrimary);
 406  
 
 407  
         // Make sure we send a new request. The old reply might have been
 408  
         // before becoming the primary.
 409  7
         answers.remove(putativePrimary);
 410  
 
 411  
         // If the primary agrees that they are the primary then it is
 412  
         // probably true.
 413  7
         final Future<Reply> reply = sendIsPrimary(answers, connections,
 414  
                 putativePrimary, true);
 415  7
         final String primary = checkReply(reply, connections, putativePrimary,
 416  
                 deadline);
 417  7
         if (putativePrimary.getCanonicalName().equals(primary)) {
 418  2
             return true;
 419  
         }
 420  
 
 421  5
         return false;
 422  
     }
 423  
 
 424  
     /**
 425  
      * Creates the {@link ReplicaSetConnection} for the primary server.
 426  
      * 
 427  
      * @param connections
 428  
      *            The connection that are being managed.
 429  
      * @param primaryServer
 430  
      *            The primary server.
 431  
      * @return The {@link ReplicaSetConnection}.
 432  
      */
 433  
     private ConnectionInfo<Server> createReplicaSetConnection(
 434  
             final Map<Server, Connection> connections,
 435  
             final Server primaryServer) {
 436  2
         final Connection primaryConn = connections.remove(primaryServer);
 437  
 
 438  2
         return new ConnectionInfo<Server>(primaryConn, primaryServer);
 439  
     }
 440  
 
 441  
     /**
 442  
      * Tries to send messages to all of the members of the cluster in an
 443  
      * indeterminate state.
 444  
      * 
 445  
      * @param state
 446  
      *            The state of the cluster.
 447  
      * @param answers
 448  
      *            The pending responses.
 449  
      * @param connections
 450  
      *            The connection already created.
 451  
      */
 452  
     private void updateUnknown(final Cluster state,
 453  
             final Map<Server, Future<Reply>> answers,
 454  
             final Map<Server, Connection> connections) {
 455  2
         for (final Server server : state.getServers()) {
 456  6
             switch (server.getState()) {
 457  
             case UNKNOWN: // Fall through.
 458  
             case UNAVAILABLE: {
 459  0
                 answers.remove(server);
 460  0
                 sendIsPrimary(answers, connections, server, false);
 461  0
                 break;
 462  
             }
 463  
             case READ_ONLY:
 464  
             case WRITABLE:
 465  
             default: {
 466  
                 // Known good.
 467  
                 break;
 468  
             }
 469  
             }
 470  6
         }
 471  2
     }
 472  
 }