Coverage Report - com.allanbank.mongodb.client.connection.socket.SocketConnectionFactory
 
Classes in this File Line Coverage Branch Coverage Complexity
SocketConnectionFactory
78%
52/66
50%
8/16
2.4
SocketConnectionFactory$1
100%
1/1
N/A
2.4
SocketConnectionFactory$ConfigurationListener
33%
1/3
N/A
2.4
 
 1  
 /*
 2  
  * #%L
 3  
  * SocketConnectionFactory.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client.connection.socket;
 21  
 
 22  
 import java.beans.PropertyChangeEvent;
 23  
 import java.beans.PropertyChangeListener;
 24  
 import java.io.IOException;
 25  
 import java.lang.ref.Reference;
 26  
 import java.net.InetSocketAddress;
 27  
 import java.util.ArrayList;
 28  
 import java.util.Collections;
 29  
 import java.util.List;
 30  
 import java.util.concurrent.ExecutionException;
 31  
 
 32  
 import com.allanbank.mongodb.MongoClientConfiguration;
 33  
 import com.allanbank.mongodb.Version;
 34  
 import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
 35  
 import com.allanbank.mongodb.bson.io.StringDecoderCache;
 36  
 import com.allanbank.mongodb.bson.io.StringEncoderCache;
 37  
 import com.allanbank.mongodb.client.ClusterStats;
 38  
 import com.allanbank.mongodb.client.ClusterType;
 39  
 import com.allanbank.mongodb.client.connection.Connection;
 40  
 import com.allanbank.mongodb.client.connection.ConnectionFactory;
 41  
 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
 42  
 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
 43  
 import com.allanbank.mongodb.client.message.IsMaster;
 44  
 import com.allanbank.mongodb.client.state.Cluster;
 45  
 import com.allanbank.mongodb.client.state.LatencyServerSelector;
 46  
 import com.allanbank.mongodb.client.state.Server;
 47  
 import com.allanbank.mongodb.client.state.ServerSelector;
 48  
 import com.allanbank.mongodb.client.state.ServerUpdateCallback;
 49  
 import com.allanbank.mongodb.client.state.SimpleReconnectStrategy;
 50  
 import com.allanbank.mongodb.util.log.Log;
 51  
 import com.allanbank.mongodb.util.log.LogFactory;
 52  
 
 53  
 /**
 54  
  * {@link ConnectionFactory} to create direct socket connections to the servers.
 55  
  * 
 56  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 57  
  *         mutated in incompatible ways between any two releases of the driver.
 58  
  * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
 59  
  */
 60  
 public class SocketConnectionFactory implements ProxiedConnectionFactory {
 61  
 
 62  
     /** The logger for the factory. */
 63  1
     private static final Log LOG = LogFactory
 64  
             .getLog(SocketConnectionFactory.class);
 65  
 
 66  
     /**
 67  
      * The buffers used by the single threaded connections. Each buffer is
 68  
      * shared by all connections but there can be up to 1 buffer per application
 69  
      * thread. We use a reference to the buffer to allow the garbage collector
 70  
      * To clean up the stream.
 71  
      */
 72  
     private ThreadLocal<Reference<BufferingBsonOutputStream>> myBuffers;
 73  
 
 74  
     /** The state of the cluster. */
 75  
     private final Cluster myCluster;
 76  
 
 77  
     /** The MongoDB client configuration. */
 78  
     private final MongoClientConfiguration myConfig;
 79  
 
 80  
     /** The MongoDB client configuration. */
 81  
     private final ConfigurationListener myConfigListener;
 82  
 
 83  
     /** Cache used for decoding strings. */
 84  
     private final StringDecoderCache myDecoderCache;
 85  
 
 86  
     /** Cache used for encoding strings. */
 87  
     private final StringEncoderCache myEncoderCache;
 88  
 
 89  
     /** The server selector. */
 90  
     private final ServerSelector myServerSelector;
 91  
 
 92  
     /**
 93  
      * Creates a new {@link SocketConnectionFactory}.
 94  
      * 
 95  
      * @param config
 96  
      *            The MongoDB client configuration.
 97  
      */
 98  
     public SocketConnectionFactory(final MongoClientConfiguration config) {
 99  37
         super();
 100  37
         myConfig = config;
 101  37
         myCluster = new Cluster(config, ClusterType.STAND_ALONE);
 102  37
         myServerSelector = new LatencyServerSelector(myCluster, true);
 103  37
         myBuffers = new ThreadLocal<Reference<BufferingBsonOutputStream>>();
 104  
 
 105  37
         myConfigListener = new ConfigurationListener();
 106  37
         myConfig.addPropertyChangeListener(myConfigListener);
 107  
 
 108  37
         myDecoderCache = new StringDecoderCache();
 109  37
         myDecoderCache.setMaxCacheEntries(config.getMaxCachedStringEntries());
 110  37
         myDecoderCache.setMaxCacheLength(config.getMaxCachedStringLength());
 111  
 
 112  37
         myEncoderCache = new StringEncoderCache();
 113  37
         myEncoderCache.setMaxCacheEntries(config.getMaxCachedStringEntries());
 114  37
         myEncoderCache.setMaxCacheLength(config.getMaxCachedStringLength());
 115  37
     }
 116  
 
 117  
     /**
 118  
      * {@inheritDoc}
 119  
      * <p>
 120  
      * Overridden to do nothing.
 121  
      * </p>
 122  
      */
 123  
     @Override
 124  
     public void close() {
 125  34
         myBuffers = null; // Let the ThreadLocal's weak reference go.
 126  34
         myConfig.removePropertyChangeListener(myConfigListener);
 127  
 
 128  
         // Release the cached entries too.
 129  34
         myDecoderCache.setMaxCacheEntries(0);
 130  34
         myDecoderCache.setMaxCacheLength(0);
 131  34
     }
 132  
 
 133  
     /**
 134  
      * {@inheritDoc}
 135  
      * <p>
 136  
      * Returns a new {@link SocketConnection}.
 137  
      * </p>
 138  
      * 
 139  
      * @see ConnectionFactory#connect()
 140  
      */
 141  
     @Override
 142  
     public Connection connect() throws IOException {
 143  5
         final List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>(
 144  
                 myConfig.getServerAddresses());
 145  
 
 146  
         // Shuffle the servers and try to connect to each until one works.
 147  5
         IOException last = null;
 148  5
         Collections.shuffle(servers);
 149  5
         for (final InetSocketAddress address : servers) {
 150  
             try {
 151  4
                 final Server server = myCluster.add(address);
 152  4
                 final Connection conn = connect(server, myConfig);
 153  
 
 154  
                 // Get the state of the server updated.
 155  3
                 final ServerUpdateCallback cb = new ServerUpdateCallback(server);
 156  3
                 conn.send(new IsMaster(), cb);
 157  
 
 158  3
                 if (Version.UNKNOWN.equals(server.getVersion())) {
 159  
                     // If we don't know the version then wait for that response.
 160  
                     try {
 161  1
                         cb.get();
 162  
                     }
 163  0
                     catch (final ExecutionException e) {
 164  
                         // Probably not in a good state...
 165  0
                         LOG.debug(e, "Could not execute an 'ismaster' command.");
 166  
                     }
 167  0
                     catch (final InterruptedException e) {
 168  
                         // Probably not in a good state...
 169  0
                         LOG.debug(e, "Could not execute an 'ismaster' command.");
 170  1
                     }
 171  
                 }
 172  
 
 173  3
                 return conn;
 174  
             }
 175  1
             catch (final IOException error) {
 176  1
                 last = error;
 177  
             }
 178  1
         }
 179  
 
 180  2
         if (last != null) {
 181  1
             throw last;
 182  
         }
 183  1
         throw new IOException("Could not connect to any server: " + servers);
 184  
     }
 185  
 
 186  
     /**
 187  
      * Creates a connection to the address provided.
 188  
      * 
 189  
      * @param server
 190  
      *            The MongoDB server to connect to.
 191  
      * @param config
 192  
      *            The configuration for the Connection to the MongoDB server.
 193  
      * @return The Connection to MongoDB.
 194  
      * @throws IOException
 195  
      *             On a failure connecting to the server.
 196  
      */
 197  
     @Override
 198  
     public Connection connect(final Server server,
 199  
             final MongoClientConfiguration config) throws IOException {
 200  
 
 201  
         final AbstractSocketConnection connection;
 202  
 
 203  107
         switch (myConfig.getConnectionModel()) {
 204  
         case SENDER_RECEIVER_THREAD: {
 205  1
             connection = new TwoThreadSocketConnection(server, myConfig,
 206  
                     myEncoderCache, myDecoderCache);
 207  1
             break;
 208  
         }
 209  
         default: { // and RECEIVER_THREAD
 210  106
             connection = new SocketConnection(server, myConfig, myEncoderCache,
 211  
                     myDecoderCache, myBuffers);
 212  
             break;
 213  
         }
 214  
         }
 215  
 
 216  
         // Start the connection.
 217  85
         connection.start();
 218  
 
 219  85
         return connection;
 220  
     }
 221  
 
 222  
     /**
 223  
      * Returns the cluster state.
 224  
      * 
 225  
      * @return The cluster state.
 226  
      */
 227  
     public Cluster getCluster() {
 228  11
         return myCluster;
 229  
     }
 230  
 
 231  
     /**
 232  
      * {@inheritDoc}
 233  
      * <p>
 234  
      * Overridden to return the {@link Cluster}.
 235  
      * </p>
 236  
      */
 237  
     @Override
 238  
     public ClusterStats getClusterStats() {
 239  0
         return myCluster;
 240  
     }
 241  
 
 242  
     /**
 243  
      * {@inheritDoc}
 244  
      * <p>
 245  
      * Overridden to return {@link ClusterType#STAND_ALONE} cluster type.
 246  
      * </p>
 247  
      */
 248  
     @Override
 249  
     public ClusterType getClusterType() {
 250  3
         return ClusterType.STAND_ALONE;
 251  
     }
 252  
 
 253  
     /**
 254  
      * {@inheritDoc}
 255  
      * <p>
 256  
      * Overridden to return a {@link SimpleReconnectStrategy}.
 257  
      * </p>
 258  
      */
 259  
     @Override
 260  
     public ReconnectStrategy getReconnectStrategy() {
 261  5
         final SimpleReconnectStrategy strategy = new SimpleReconnectStrategy();
 262  5
         strategy.setConfig(myConfig);
 263  5
         strategy.setConnectionFactory(this);
 264  5
         strategy.setSelector(myServerSelector);
 265  5
         strategy.setState(myCluster);
 266  
 
 267  5
         return strategy;
 268  
     }
 269  
 
 270  
     /**
 271  
      * Notification of a change to the configuration of the client.
 272  
      * 
 273  
      * @param evt
 274  
      *            The details of the configuration change.
 275  
      */
 276  
     protected void configurationChanged(final PropertyChangeEvent evt) {
 277  0
         final String name = evt.getPropertyName();
 278  0
         final Object value = evt.getNewValue();
 279  0
         if ("maxCachedStringEntries".equals(name) && (value instanceof Number)) {
 280  0
             myDecoderCache.setMaxCacheEntries(((Number) value).intValue());
 281  0
             myEncoderCache.setMaxCacheEntries(((Number) value).intValue());
 282  
         }
 283  0
         else if ("maxCachedStringLength".equals(name)
 284  
                 && (value instanceof Number)) {
 285  0
             myDecoderCache.setMaxCacheLength(((Number) value).intValue());
 286  0
             myEncoderCache.setMaxCacheLength(((Number) value).intValue());
 287  
         }
 288  0
     }
 289  
 
 290  
     /**
 291  
      * ConfigurationListener provides a listener for changes in the client's
 292  
      * configuration.
 293  
      * 
 294  
      * @api.no This class is <b>NOT</b> part of the drivers API. This class may
 295  
      *         be mutated in incompatible ways between any two releases of the
 296  
      *         driver.
 297  
      * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
 298  
      */
 299  37
     protected final class ConfigurationListener implements
 300  
             PropertyChangeListener {
 301  
         @Override
 302  
         public void propertyChange(final PropertyChangeEvent evt) {
 303  0
             configurationChanged(evt);
 304  0
         }
 305  
     }
 306  
 }