Coverage Report - com.allanbank.mongodb.client.connection.sharded.ShardedConnectionFactory
 
Classes in this File Line Coverage Branch Coverage Complexity
ShardedConnectionFactory
97%
80/82
92%
24/26
2.053
ShardedConnectionFactory$BootstrapState
100%
7/7
N/A
2.053
 
 1  
 /*
 2  
  * #%L
 3  
  * ShardedConnectionFactory.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client.connection.sharded;
 21  
 
 22  
 import java.io.IOException;
 23  
 import java.net.InetSocketAddress;
 24  
 import java.util.List;
 25  
 import java.util.concurrent.ExecutionException;
 26  
 import java.util.logging.Level;
 27  
 
 28  
 import com.allanbank.mongodb.MongoClientConfiguration;
 29  
 import com.allanbank.mongodb.MongoDbException;
 30  
 import com.allanbank.mongodb.ReadPreference;
 31  
 import com.allanbank.mongodb.bson.Document;
 32  
 import com.allanbank.mongodb.bson.Element;
 33  
 import com.allanbank.mongodb.bson.element.StringElement;
 34  
 import com.allanbank.mongodb.builder.Find;
 35  
 import com.allanbank.mongodb.client.ClusterStats;
 36  
 import com.allanbank.mongodb.client.ClusterType;
 37  
 import com.allanbank.mongodb.client.Message;
 38  
 import com.allanbank.mongodb.client.callback.FutureReplyCallback;
 39  
 import com.allanbank.mongodb.client.connection.Connection;
 40  
 import com.allanbank.mongodb.client.connection.ConnectionFactory;
 41  
 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
 42  
 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
 43  
 import com.allanbank.mongodb.client.message.GetMore;
 44  
 import com.allanbank.mongodb.client.message.Query;
 45  
 import com.allanbank.mongodb.client.message.Reply;
 46  
 import com.allanbank.mongodb.client.state.Cluster;
 47  
 import com.allanbank.mongodb.client.state.ClusterPinger;
 48  
 import com.allanbank.mongodb.client.state.LatencyServerSelector;
 49  
 import com.allanbank.mongodb.client.state.Server;
 50  
 import com.allanbank.mongodb.client.state.ServerSelector;
 51  
 import com.allanbank.mongodb.util.IOUtils;
 52  
 import com.allanbank.mongodb.util.log.Log;
 53  
 import com.allanbank.mongodb.util.log.LogFactory;
 54  
 
 55  
 /**
 56  
  * Provides the ability to create connections to a shard configuration via
 57  
  * mongos servers.
 58  
  * 
 59  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 60  
  *         mutated in incompatible ways between any two releases of the driver.
 61  
  * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
 62  
  */
 63  
 public class ShardedConnectionFactory implements ConnectionFactory {
 64  
 
 65  
     /** The logger for the {@link ShardedConnectionFactory}. */
 66  1
     protected static final Log LOG = LogFactory
 67  
             .getLog(ShardedConnectionFactory.class);
 68  
 
 69  
     /** The state of the cluster. */
 70  
     protected final Cluster myCluster;
 71  
 
 72  
     /** The MongoDB client configuration. */
 73  
     protected final MongoClientConfiguration myConfig;
 74  
 
 75  
     /** The factory to create proxied connections. */
 76  
     protected final ProxiedConnectionFactory myConnectionFactory;
 77  
 
 78  
     /** Pings the servers in the cluster collecting latency and tags. */
 79  
     protected final ClusterPinger myPinger;
 80  
 
 81  
     /** The selector for the mongos instance to use. */
 82  
     protected final ServerSelector mySelector;
 83  
 
 84  
     /**
 85  
      * Creates a new {@link ShardedConnectionFactory}.
 86  
      * 
 87  
      * @param factory
 88  
      *            The factory to create proxied connections.
 89  
      * @param config
 90  
      *            The initial configuration.
 91  
      */
 92  
     public ShardedConnectionFactory(final ProxiedConnectionFactory factory,
 93  16
             final MongoClientConfiguration config) {
 94  16
         myConnectionFactory = factory;
 95  16
         myConfig = config;
 96  16
         myCluster = createCluster(config);
 97  16
         mySelector = createSelector();
 98  16
         myPinger = createClusterPinger(factory, config);
 99  
 
 100  
         // Add all of the servers to the cluster.
 101  16
         for (final InetSocketAddress address : config.getServerAddresses()) {
 102  16
             myCluster.add(address);
 103  16
         }
 104  
 
 105  16
         bootstrap();
 106  16
     }
 107  
 
 108  
     /**
 109  
      * Finds the mongos servers.
 110  
      */
 111  
     public void bootstrap() {
 112  16
         final BootstrapState state = createBootstrapState();
 113  16
         if (!state.done()) {
 114  14
             for (final Server addr : myCluster.getServers()) {
 115  13
                 Connection conn = null;
 116  
                 try {
 117  
                     // Send the request...
 118  13
                     conn = myConnectionFactory.connect(addr, myConfig);
 119  
 
 120  12
                     update(state, conn);
 121  
 
 122  8
                     if (state.done()) {
 123  
                         break;
 124  
                     }
 125  
                 }
 126  1
                 catch (final IOException ioe) {
 127  1
                     LOG.warn(ioe, "I/O error during sharded bootstrap to {}.",
 128  
                             addr);
 129  
                 }
 130  2
                 catch (final MongoDbException me) {
 131  2
                     LOG.warn(me,
 132  
                             "MongoDB error during sharded bootstrap to {}.",
 133  
                             addr);
 134  
                 }
 135  1
                 catch (final InterruptedException e) {
 136  1
                     LOG.warn(e, "Interrupted during sharded bootstrap to {}.",
 137  
                             addr);
 138  
                 }
 139  1
                 catch (final ExecutionException e) {
 140  1
                     LOG.warn(e, "Error during sharded bootstrap to {}.", addr);
 141  
                 }
 142  
                 finally {
 143  10
                     IOUtils.close(conn, Level.WARNING,
 144  
                             "I/O error shutting down sharded bootstrap connection to "
 145  
                                     + addr + ".");
 146  8
                 }
 147  8
             }
 148  
         }
 149  
 
 150  
         // Last thing is to start the ping of servers. This will get the tags
 151  
         // and latencies updated.
 152  16
         myPinger.initialSweep(myCluster);
 153  16
         myPinger.start();
 154  16
     }
 155  
 
 156  
     /**
 157  
      * {@inheritDoc}
 158  
      * <p>
 159  
      * Overridden to close the cluster state and the
 160  
      * {@link ProxiedConnectionFactory}.
 161  
      * </p>
 162  
      */
 163  
     @Override
 164  
     public void close() {
 165  17
         IOUtils.close(myPinger);
 166  17
         IOUtils.close(myConnectionFactory);
 167  17
     }
 168  
 
 169  
     /**
 170  
      * Creates a new connection to the shared mongos servers.
 171  
      * 
 172  
      * @see ConnectionFactory#connect()
 173  
      */
 174  
     @Override
 175  
     public Connection connect() throws IOException {
 176  5
         IOException lastError = null;
 177  5
         for (final Server server : mySelector.pickServers()) {
 178  
             try {
 179  3
                 final Connection primaryConn = myConnectionFactory.connect(
 180  
                         server, myConfig);
 181  
 
 182  2
                 return wrap(primaryConn, server);
 183  
             }
 184  1
             catch (final IOException e) {
 185  1
                 lastError = e;
 186  
             }
 187  1
         }
 188  
 
 189  3
         if (lastError != null) {
 190  1
             throw lastError;
 191  
         }
 192  
 
 193  2
         throw new IOException(
 194  
                 "Could not determine a shard server to connect to.");
 195  
     }
 196  
 
 197  
     /**
 198  
      * {@inheritDoc}
 199  
      * <p>
 200  
      * Overridden to return the {@link Cluster}.
 201  
      * </p>
 202  
      */
 203  
     @Override
 204  
     public ClusterStats getClusterStats() {
 205  0
         return myCluster;
 206  
     }
 207  
 
 208  
     /**
 209  
      * {@inheritDoc}
 210  
      * <p>
 211  
      * Overridden to return {@link ClusterType#SHARDED} cluster type.
 212  
      * </p>
 213  
      */
 214  
     @Override
 215  
     public ClusterType getClusterType() {
 216  2
         return ClusterType.SHARDED;
 217  
     }
 218  
 
 219  
     /**
 220  
      * {@inheritDoc}
 221  
      * <p>
 222  
      * Overridden to return the delegates strategy but replace his state and
 223  
      * selector with our own.
 224  
      * </p>
 225  
      */
 226  
     @Override
 227  
     public ReconnectStrategy getReconnectStrategy() {
 228  2
         final ReconnectStrategy delegates = myConnectionFactory
 229  
                 .getReconnectStrategy();
 230  
 
 231  2
         delegates.setState(myCluster);
 232  2
         delegates.setSelector(mySelector);
 233  2
         delegates.setConnectionFactory(myConnectionFactory);
 234  
 
 235  2
         return delegates;
 236  
     }
 237  
 
 238  
     /**
 239  
      * Creates a new {@link BootstrapState}.
 240  
      * 
 241  
      * @return The {@link BootstrapState} to track state of loading the cluster
 242  
      *         information.
 243  
      */
 244  
     protected BootstrapState createBootstrapState() {
 245  16
         return new BootstrapState(!myConfig.isAutoDiscoverServers());
 246  
     }
 247  
 
 248  
     /**
 249  
      * Creates a {@link Cluster} object to track the state of the servers across
 250  
      * the cluster.
 251  
      * 
 252  
      * @param config
 253  
      *            The configuration for the cluster.
 254  
      * @return The {@link Cluster} to track the servers across the cluster.
 255  
      */
 256  
     protected Cluster createCluster(final MongoClientConfiguration config) {
 257  16
         return new Cluster(config, ClusterType.SHARDED);
 258  
     }
 259  
 
 260  
     /**
 261  
      * Creates a {@link ClusterPinger} object to periodically update the status
 262  
      * of the servers.
 263  
      * 
 264  
      * @param factory
 265  
      *            The factory for creating the connections to the servers.
 266  
      * @param config
 267  
      *            The configuration for the client.
 268  
      * 
 269  
      * @return The {@link ClusterPinger} object to periodically update the
 270  
      *         status of the servers.
 271  
      */
 272  
     protected ClusterPinger createClusterPinger(
 273  
             final ProxiedConnectionFactory factory,
 274  
             final MongoClientConfiguration config) {
 275  16
         return new ClusterPinger(myCluster, factory, config);
 276  
     }
 277  
 
 278  
     /**
 279  
      * Creates a {@link ServerSelector} object to select the (presumed) optimal
 280  
      * server to handle a request.
 281  
      * <p>
 282  
      * For a sharded cluster this defaults to the {@link LatencyServerSelector}.
 283  
      * </p>
 284  
      * 
 285  
      * @return The {@link ServerSelector} object to select the (presumed)
 286  
      *         optimal server to handle a request.
 287  
      */
 288  
     protected ServerSelector createSelector() {
 289  16
         return new LatencyServerSelector(myCluster, true);
 290  
     }
 291  
 
 292  
     /**
 293  
      * Performs a find on the <tt>config</tt> database's <tt>mongos</tt>
 294  
      * collection to return the id for all of the mongos servers in the cluster.
 295  
      * <p>
 296  
      * A single mongos entry looks like: <blockquote>
 297  
      * 
 298  
      * <pre>
 299  
      * <code>
 300  
      * {
 301  
      *     "_id" : "mongos.example.com:27017",
 302  
      *     "ping" : ISODate("2011-12-05T23:54:03.122Z"),
 303  
      *     "up" : 330
 304  
      * }
 305  
      * </code>
 306  
      * </pre>
 307  
      * 
 308  
      * </blockquote>
 309  
      * 
 310  
      * @param conn
 311  
      *            The connection to request from.
 312  
      * @return True if the configuration servers have been determined.
 313  
      * @throws ExecutionException
 314  
      *             On a failure to recover the response from the server.
 315  
      * @throws InterruptedException
 316  
      *             On a failure to receive a response from the server.
 317  
      */
 318  
     protected boolean findMongosServers(final Connection conn)
 319  
             throws InterruptedException, ExecutionException {
 320  12
         boolean found = false;
 321  
 
 322  
         // Create a query to pull all of the mongos servers out of the
 323  
         // config database.
 324  12
         Message message = new Query("config", "mongos", Find.ALL,
 325  
         /* fields= */null, /* batchSize= */0,
 326  
         /* limit= */0, /* numberToSkip= */0, /* tailable= */false,
 327  
                 ReadPreference.PRIMARY, /* noCursorTimeout= */false,
 328  
                 /* awaitData= */false, /* exhaust= */false, /* partial= */
 329  
                 false);
 330  
 
 331  20
         while (message != null) {
 332  
             // Send the request...
 333  12
             final FutureReplyCallback future = new FutureReplyCallback();
 334  12
             conn.send(message, future);
 335  
 
 336  
             // Don's send it again.
 337  10
             message = null;
 338  
 
 339  
             // Receive the response.
 340  10
             final Reply reply = future.get();
 341  
 
 342  
             // Validate and pull out the response information.
 343  8
             final List<Document> docs = reply.getResults();
 344  8
             for (final Document doc : docs) {
 345  11
                 final Element idElem = doc.get("_id");
 346  11
                 if (idElem instanceof StringElement) {
 347  9
                     final StringElement id = (StringElement) idElem;
 348  
 
 349  9
                     myCluster.add(id.getValue());
 350  9
                     found = true;
 351  
 
 352  9
                     LOG.debug("Adding shard mongos: {}", id.getValue());
 353  
                 }
 354  11
             }
 355  
 
 356  
             // Cursor?
 357  8
             if (reply.getCursorId() != 0) {
 358  
                 // Send a GetMore.
 359  0
                 message = new GetMore("config", "mongos", reply.getCursorId(),
 360  
                         0, ReadPreference.PRIMARY);
 361  
             }
 362  8
         }
 363  
 
 364  8
         return found;
 365  
     }
 366  
 
 367  
     /**
 368  
      * Returns the clusterState value.
 369  
      * 
 370  
      * @return The clusterState value.
 371  
      */
 372  
     protected Cluster getCluster() {
 373  3
         return myCluster;
 374  
     }
 375  
 
 376  
     /**
 377  
      * Queries for the addresses for the {@code mongos} servers via the
 378  
      * {@link #findMongosServers(Connection)} method.
 379  
      * 
 380  
      * @param state
 381  
      *            The state of the bootstrap to be updated.
 382  
      * @param conn
 383  
      *            The connection to use to locate the {@code mongos} servers
 384  
      * @throws InterruptedException
 385  
      *             On a failure to wait for the reply to the query due to the
 386  
      *             thread being interrupted.
 387  
      * @throws ExecutionException
 388  
      *             On a failure to execute the query.
 389  
      */
 390  
     protected void update(final BootstrapState state, final Connection conn)
 391  
             throws InterruptedException, ExecutionException {
 392  12
         if (state.isMongosFound() || findMongosServers(conn)) {
 393  5
             state.setMongosFound(true);
 394  
         }
 395  8
     }
 396  
 
 397  
     /**
 398  
      * Wraps the connection in a shard-aware connection.
 399  
      * 
 400  
      * @param primaryConn
 401  
      *            The primary shard connection.
 402  
      * @param server
 403  
      *            The server the connection is connected to.
 404  
      * @return The wrapped connection.
 405  
      */
 406  
     protected Connection wrap(final Connection primaryConn, final Server server) {
 407  2
         return new ShardedConnection(primaryConn, server, myCluster,
 408  
                 mySelector, myConnectionFactory, myConfig);
 409  
     }
 410  
 
 411  
     /**
 412  
      * BootstrapState provides the ability to track the state of the bootstrap
 413  
      * for the sharded cluster.
 414  
      * 
 415  
      * @copyright 2013-2014, Allanbank Consulting, Inc., All Rights Reserved
 416  
      */
 417  
     protected static class BootstrapState {
 418  
         /** Tracks if the {@code mongos} servers have been located. */
 419  
         private boolean myMongosFound;
 420  
 
 421  
         /**
 422  
          * Creates a new BootstrapState.
 423  
          * 
 424  
          * @param mongosFound
 425  
          *            Initials if we should look for the {@code mongos} servers.
 426  
          */
 427  16
         protected BootstrapState(final boolean mongosFound) {
 428  16
             myMongosFound = mongosFound;
 429  16
         }
 430  
 
 431  
         /**
 432  
          * Indicates when the bootstrap is complete.
 433  
          * <p>
 434  
          * This method returns true if auto discovery is turned off or (if on)
 435  
          * when all of the {@code mongos} servers have been located.
 436  
          * 
 437  
          * @return True once the boot strap is complete.
 438  
          */
 439  
         public boolean done() {
 440  24
             return myMongosFound;
 441  
         }
 442  
 
 443  
         /**
 444  
          * Returns true if the {@code mongos} servers have been found, false
 445  
          * otherwise.
 446  
          * 
 447  
          * @return True if the {@code mongos} servers have been found, false
 448  
          *         otherwise.
 449  
          */
 450  
         public boolean isMongosFound() {
 451  12
             return myMongosFound;
 452  
         }
 453  
 
 454  
         /**
 455  
          * Sets if the the {@code mongos} servers have been found.
 456  
          * 
 457  
          * @param mongosFound
 458  
          *            If true, the {@code mongos} servers have been found, false
 459  
          *            otherwise.
 460  
          */
 461  
         public void setMongosFound(final boolean mongosFound) {
 462  5
             myMongosFound = mongosFound;
 463  5
         }
 464  
     }
 465  
 }