Coverage Report - com.allanbank.mongodb.client.connection.proxy.AbstractProxyMultipleConnection
 
Classes in this File Line Coverage Branch Coverage Complexity
AbstractProxyMultipleConnection
94%
142/150
84%
59/70
2.533
AbstractProxyMultipleConnection$ClusterAndConnectionListener
100%
7/7
87%
7/8
2.533
 
 1  
 /*
 2  
  * #%L
 3  
  * AbstractProxyMultipleConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 
 21  
 package com.allanbank.mongodb.client.connection.proxy;
 22  
 
 23  
 import java.beans.PropertyChangeEvent;
 24  
 import java.beans.PropertyChangeListener;
 25  
 import java.beans.PropertyChangeSupport;
 26  
 import java.io.IOException;
 27  
 import java.util.Arrays;
 28  
 import java.util.HashSet;
 29  
 import java.util.List;
 30  
 import java.util.Map;
 31  
 import java.util.Set;
 32  
 import java.util.concurrent.ConcurrentHashMap;
 33  
 import java.util.concurrent.ConcurrentMap;
 34  
 import java.util.concurrent.TimeUnit;
 35  
 import java.util.concurrent.atomic.AtomicBoolean;
 36  
 import java.util.concurrent.atomic.AtomicReference;
 37  
 
 38  
 import com.allanbank.mongodb.Callback;
 39  
 import com.allanbank.mongodb.MongoClientConfiguration;
 40  
 import com.allanbank.mongodb.MongoDbException;
 41  
 import com.allanbank.mongodb.ReadPreference;
 42  
 import com.allanbank.mongodb.client.Message;
 43  
 import com.allanbank.mongodb.client.callback.ReplyCallback;
 44  
 import com.allanbank.mongodb.client.connection.Connection;
 45  
 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
 46  
 import com.allanbank.mongodb.client.state.Cluster;
 47  
 import com.allanbank.mongodb.error.ConnectionLostException;
 48  
 import com.allanbank.mongodb.util.log.Log;
 49  
 import com.allanbank.mongodb.util.log.LogFactory;
 50  
 
 51  
 /**
 52  
  * AbstractProxyMultipleConnection provides the core functionality for a
 53  
  * connection that multiplexes requests across multiple connections.
 54  
  * 
 55  
  * @param <K>
 56  
  *            The key used to track the various connections.
 57  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 58  
  *         mutated in incompatible ways between any two releases of the driver.
 59  
  * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
 60  
  */
 61  
 public abstract class AbstractProxyMultipleConnection<K> implements Connection {
 62  
 
 63  
     /** The logger for the {@link AbstractProxyMultipleConnection}. */
 64  1
     private static final Log LOG = LogFactory
 65  
             .getLog(AbstractProxyMultipleConnection.class);
 66  
 
 67  
     /** The state of the cluster for finding secondary connections. */
 68  
     protected final Cluster myCluster;
 69  
 
 70  
     /** The MongoDB client configuration. */
 71  
     protected final MongoClientConfiguration myConfig;
 72  
 
 73  
     /** Support for emitting property change events. */
 74  
     protected final PropertyChangeSupport myEventSupport;
 75  
 
 76  
     /** The connection factory for opening secondary connections. */
 77  
     protected final ProxiedConnectionFactory myFactory;
 78  
 
 79  
     /** The most recently used connection. */
 80  
     protected final AtomicReference<Connection> myLastUsedConnection;
 81  
 
 82  
     /** The listener for changes in the cluster and connections. */
 83  
     protected final PropertyChangeListener myListener;
 84  
 
 85  
     /** The primary instance this connection is connected to. */
 86  
     protected volatile K myMainKey;
 87  
 
 88  
     /** Set to false when the connection is closed. */
 89  
     protected final AtomicBoolean myOpen;
 90  
 
 91  
     /** Set to true when the connection should be gracefully closed. */
 92  
     protected final AtomicBoolean myShutdown;
 93  
 
 94  
     /** The servers this connection is connected to. */
 95  
     /* package */final ConcurrentMap<K, Connection> myConnections;
 96  
 
 97  
     /**
 98  
      * Creates a new {@link AbstractProxyMultipleConnection}.
 99  
      * 
 100  
      * @param proxiedConnection
 101  
      *            The connection being proxied.
 102  
      * @param server
 103  
      *            The primary server this connection is connected to.
 104  
      * @param cluster
 105  
      *            The state of the cluster for finding secondary connections.
 106  
      * @param factory
 107  
      *            The connection factory for opening secondary connections.
 108  
      * @param config
 109  
      *            The MongoDB client configuration.
 110  
      */
 111  
     public AbstractProxyMultipleConnection(final Connection proxiedConnection,
 112  
             final K server, final Cluster cluster,
 113  
             final ProxiedConnectionFactory factory,
 114  52
             final MongoClientConfiguration config) {
 115  52
         myMainKey = server;
 116  52
         myCluster = cluster;
 117  52
         myFactory = factory;
 118  52
         myConfig = config;
 119  
 
 120  52
         myOpen = new AtomicBoolean(true);
 121  52
         myShutdown = new AtomicBoolean(false);
 122  52
         myEventSupport = new PropertyChangeSupport(this);
 123  52
         myConnections = new ConcurrentHashMap<K, Connection>();
 124  52
         myLastUsedConnection = new AtomicReference<Connection>(
 125  
                 proxiedConnection);
 126  
 
 127  52
         myListener = new ClusterAndConnectionListener();
 128  52
         myCluster.addListener(myListener);
 129  
 
 130  52
         if (proxiedConnection != null) {
 131  49
             cacheConnection(server, proxiedConnection);
 132  
         }
 133  52
     }
 134  
 
 135  
     /**
 136  
      * {@inheritDoc}
 137  
      * <p>
 138  
      * Overridden to add this listener to this connection's event source.
 139  
      * </p>
 140  
      */
 141  
     @Override
 142  
     public void addPropertyChangeListener(final PropertyChangeListener listener) {
 143  1
         myEventSupport.addPropertyChangeListener(listener);
 144  1
     }
 145  
 
 146  
     /**
 147  
      * Closes the underlying connection.
 148  
      * 
 149  
      * @see Connection#close()
 150  
      */
 151  
     @Override
 152  
     public void close() {
 153  
 
 154  51
         myOpen.set(false);
 155  51
         myCluster.removeListener(myListener);
 156  
 
 157  51
         for (final Connection conn : myConnections.values()) {
 158  
             try {
 159  56
                 conn.removePropertyChangeListener(myListener);
 160  56
                 conn.close();
 161  
             }
 162  1
             catch (final IOException ioe) {
 163  1
                 LOG.warn(ioe, "Could not close the connection: {}", conn);
 164  55
             }
 165  56
         }
 166  51
     }
 167  
 
 168  
     /**
 169  
      * {@inheritDoc}
 170  
      * <p>
 171  
      * Forwards the call to the proxied {@link Connection}.
 172  
      * </p>
 173  
      * 
 174  
      * @see java.io.Flushable#flush()
 175  
      */
 176  
     @Override
 177  
     public void flush() throws IOException {
 178  2
         for (final Connection conn : myConnections.values()) {
 179  
             try {
 180  4
                 conn.flush();
 181  
             }
 182  1
             catch (final IOException ioe) {
 183  1
                 LOG.warn(ioe, "Could not flush the connection: {}", conn);
 184  3
             }
 185  4
         }
 186  2
     }
 187  
 
 188  
     /**
 189  
      * {@inheritDoc}
 190  
      * <p>
 191  
      * Overridden to return the pending count for the last connection used to
 192  
      * send a message.
 193  
      * </p>
 194  
      */
 195  
     @Override
 196  
     public int getPendingCount() {
 197  1
         return myLastUsedConnection.get().getPendingCount();
 198  
     }
 199  
 
 200  
     /**
 201  
      * {@inheritDoc}
 202  
      * <p>
 203  
      * True if the connection is open and not shutting down.
 204  
      * </p>
 205  
      */
 206  
     @Override
 207  
     public boolean isAvailable() {
 208  19
         return isOpen() && !isShuttingDown();
 209  
     }
 210  
 
 211  
     /**
 212  
      * {@inheritDoc}
 213  
      * <p>
 214  
      * Overridden to return if the last used connection is idle.
 215  
      * </p>
 216  
      */
 217  
     @Override
 218  
     public boolean isIdle() {
 219  1
         return myLastUsedConnection.get().isIdle();
 220  
     }
 221  
 
 222  
     /**
 223  
      * {@inheritDoc}
 224  
      * <p>
 225  
      * Overridden to return if this connection has any open connections.
 226  
      * </p>
 227  
      */
 228  
     @Override
 229  
     public boolean isOpen() {
 230  23
         return myOpen.get();
 231  
     }
 232  
 
 233  
     /**
 234  
      * {@inheritDoc}
 235  
      * <p>
 236  
      * Overridden to return if the last used connection is shutting down.
 237  
      * </p>
 238  
      */
 239  
     @Override
 240  
     public boolean isShuttingDown() {
 241  19
         return myShutdown.get();
 242  
     }
 243  
 
 244  
     /**
 245  
      * {@inheritDoc}
 246  
      * <p>
 247  
      * Overridden to raise the errors with all of the underlying connections.
 248  
      * </p>
 249  
      */
 250  
     @Override
 251  
     public void raiseErrors(final MongoDbException exception) {
 252  1
         for (final Connection conn : myConnections.values()) {
 253  1
             conn.raiseErrors(exception);
 254  1
         }
 255  1
     }
 256  
 
 257  
     /**
 258  
      * {@inheritDoc}
 259  
      * <p>
 260  
      * Overridden to remove the listener from this connection.
 261  
      * </p>
 262  
      */
 263  
     @Override
 264  
     public void removePropertyChangeListener(
 265  
             final PropertyChangeListener listener) {
 266  1
         myEventSupport.removePropertyChangeListener(listener);
 267  1
     }
 268  
 
 269  
     /**
 270  
      * {@inheritDoc}
 271  
      * <p>
 272  
      * Locates all of the potential servers that can receive all of the
 273  
      * messages. Tries to then send the messages to a server with a connection
 274  
      * already open or failing that tries to open a connection to open of the
 275  
      * servers.
 276  
      * </p>
 277  
      */
 278  
     @Override
 279  
     public void send(final Message message1, final Message message2,
 280  
             final ReplyCallback replyCallback) throws MongoDbException {
 281  
 
 282  19
         if (!isAvailable()) {
 283  0
             throw new ConnectionLostException("Connection shutting down.");
 284  
         }
 285  
 
 286  19
         final List<K> servers = findPotentialKeys(message1, message2);
 287  14
         if (!trySend(servers, message1, message2, replyCallback)) {
 288  1
             throw new MongoDbException(
 289  
                     "Could not send the messages to any of the potential servers.");
 290  
         }
 291  13
     }
 292  
 
 293  
     /**
 294  
      * {@inheritDoc}
 295  
      * <p>
 296  
      * Locates all of the potential servers that can receive all of the
 297  
      * messages. Tries to then send the messages to a server with a connection
 298  
      * already open or failing that tries to open a connection to open of the
 299  
      * servers.
 300  
      * </p>
 301  
      */
 302  
     @Override
 303  
     public void send(final Message message, final ReplyCallback replyCallback)
 304  
             throws MongoDbException {
 305  14
         send(message, null, replyCallback);
 306  11
     }
 307  
 
 308  
     /**
 309  
      * {@inheritDoc}
 310  
      * <p>
 311  
      * Overridden to shutdown all of the underlying connections.
 312  
      * </p>
 313  
      */
 314  
     @Override
 315  
     public void shutdown(final boolean force) {
 316  2
         myShutdown.set(true);
 317  2
         for (final Connection conn : myConnections.values()) {
 318  1
             conn.shutdown(force);
 319  1
         }
 320  2
     }
 321  
 
 322  
     /**
 323  
      * {@inheritDoc}
 324  
      * <p>
 325  
      * Overridden to return the socket information.
 326  
      * </p>
 327  
      */
 328  
     @Override
 329  
     public String toString() {
 330  3
         return getConnectionType() + "(" + myLastUsedConnection.get() + ")";
 331  
     }
 332  
 
 333  
     /**
 334  
      * {@inheritDoc}
 335  
      * <p>
 336  
      * Overridden to wait for all of the underlying connections to close.
 337  
      * </p>
 338  
      */
 339  
     @Override
 340  
     public void waitForClosed(final int timeout, final TimeUnit timeoutUnits) {
 341  2
         final long millis = timeoutUnits.toMillis(timeout);
 342  2
         long now = System.currentTimeMillis();
 343  2
         final long deadline = now + millis;
 344  
 
 345  2
         for (final Connection conn : myConnections.values()) {
 346  2
             if (now < deadline) {
 347  1
                 conn.waitForClosed((int) (deadline - now),
 348  
                         TimeUnit.MILLISECONDS);
 349  1
                 now = System.currentTimeMillis();
 350  
             }
 351  2
         }
 352  2
     }
 353  
 
 354  
     /**
 355  
      * Caches the connection to the server if there is not already a connection
 356  
      * in the cache. If there is a connection already in the cache then the one
 357  
      * provided is closed and the cached connection it returned.
 358  
      * 
 359  
      * @param server
 360  
      *            The server connected to.
 361  
      * @param conn
 362  
      *            The connection to cache, if possible.
 363  
      * @return The connection in the cache.
 364  
      */
 365  
     protected Connection cacheConnection(final K server, final Connection conn) {
 366  62
         final Connection existing = myConnections.putIfAbsent(server, conn);
 367  62
         if (existing != null) {
 368  1
             conn.shutdown(true);
 369  1
             return existing;
 370  
         }
 371  
 
 372  
         // Listener to the connection for it to close.
 373  61
         conn.addPropertyChangeListener(myListener);
 374  
 
 375  61
         return conn;
 376  
     }
 377  
 
 378  
     /**
 379  
      * Attempts to create a connection to the server, catching any exceptions
 380  
      * thrown. If a connection is created it should be
 381  
      * {@link #cacheConnection(Object, Connection) cached}.
 382  
      * 
 383  
      * @param server
 384  
      *            The server to connect to.
 385  
      * @return The connection to the server.
 386  
      */
 387  
     protected abstract Connection connect(final K server);
 388  
 
 389  
     /**
 390  
      * Returns the cached connection for the specified key. This method may
 391  
      * return {@code null}.
 392  
      * 
 393  
      * @param server
 394  
      *            The server connected to.
 395  
      * @return The connection in the cache.
 396  
      */
 397  
     protected Connection connection(final K server) {
 398  1
         return myConnections.get(server);
 399  
     }
 400  
 
 401  
     /**
 402  
      * Creates a exception for a reconnect failure.
 403  
      * 
 404  
      * @param message1
 405  
      *            The first message to send.
 406  
      * @param message2
 407  
      *            The second message to send.
 408  
      * @return The exception.
 409  
      */
 410  
     protected MongoDbException createReconnectFailure(final Message message1,
 411  
             final Message message2) {
 412  5
         final StringBuilder builder = new StringBuilder(
 413  
                 "Could not find any servers for the following set of read preferences: ");
 414  5
         final Set<ReadPreference> seen = new HashSet<ReadPreference>();
 415  5
         for (final Message message : Arrays.asList(message1, message2)) {
 416  10
             if (message != null) {
 417  7
                 final ReadPreference prefs = message.getReadPreference();
 418  7
                 if (seen.add(prefs)) {
 419  6
                     if (seen.size() > 1) {
 420  2
                         builder.append(", ");
 421  
                     }
 422  6
                     builder.append(prefs);
 423  
                 }
 424  
             }
 425  10
         }
 426  5
         builder.append('.');
 427  
 
 428  5
         return new MongoDbException(builder.toString());
 429  
     }
 430  
 
 431  
     /**
 432  
      * Sends the message on the connection.
 433  
      * 
 434  
      * @param conn
 435  
      *            The connection to send on.
 436  
      * @param message1
 437  
      *            The first message to send.
 438  
      * @param message2
 439  
      *            The second message to send, may be <code>null</code>.
 440  
      * @param reply
 441  
      *            The reply {@link Callback}.
 442  
      */
 443  
     protected void doSend(final Connection conn, final Message message1,
 444  
             final Message message2, final ReplyCallback reply) {
 445  
 
 446  
         // Use the connection for metrics etc.
 447  13
         myLastUsedConnection.lazySet(conn);
 448  
 
 449  13
         if (message2 == null) {
 450  11
             conn.send(message1, reply);
 451  
         }
 452  
         else {
 453  2
             conn.send(message1, message2, reply);
 454  
         }
 455  13
     }
 456  
 
 457  
     /**
 458  
      * Locates the set of servers that can be used to send the specified
 459  
      * messages. This method will attempt to connect to the primary server if
 460  
      * there is not a current connection to the primary.
 461  
      * 
 462  
      * @param message1
 463  
      *            The first message to send.
 464  
      * @param message2
 465  
      *            The second message to send. May be <code>null</code>.
 466  
      * @return The servers that can be used.
 467  
      * @throws MongoDbException
 468  
      *             On a failure to locate a server that all messages can be sent
 469  
      *             to.
 470  
      */
 471  
     protected abstract List<K> findPotentialKeys(final Message message1,
 472  
             final Message message2) throws MongoDbException;
 473  
 
 474  
     /**
 475  
      * Returns the type of connection (for logs, etc.).
 476  
      * 
 477  
      * @return The connection type.
 478  
      */
 479  
     protected abstract String getConnectionType();
 480  
 
 481  
     /**
 482  
      * Tries to reconnect previously open {@link Connection}s. If a connection
 483  
      * was being closed then cleans up the remaining state.
 484  
      * 
 485  
      * @param connection
 486  
      *            The connection that was closed.
 487  
      */
 488  
     protected synchronized void handleConnectionClosed(
 489  
             final Connection connection) {
 490  
 
 491  3
         if (!myOpen.get()) {
 492  0
             return;
 493  
         }
 494  
 
 495  3
         final K server = findKeyForConnection(connection);
 496  
 
 497  
         try {
 498  
             // If this is the last connection then go ahead and close this
 499  
             // replica set connection so the number of active connections can
 500  
             // shrink. Only close this connection on a graceful primary
 501  
             // shutdown to pick up when a primary change happens.
 502  3
             final K primary = myMainKey;
 503  3
             if ((myConnections.size() == 1)
 504  
                     && (!server.equals(primary) || connection.isShuttingDown())) {
 505  
 
 506  
                 // Mark this a graceful shutdown.
 507  0
                 removeCachedConnection(server, connection);
 508  0
                 shutdown(true);
 509  
 
 510  0
                 myEventSupport.firePropertyChange(Connection.OPEN_PROP_NAME,
 511  
                         true, isOpen());
 512  
             }
 513  
             // If the connection that closed was the primary then we need to
 514  
             // reconnect.
 515  3
             else if (server.equals(primary) && isOpen()) {
 516  
                 // Not sure who is primary any more.
 517  3
                 myMainKey = null;
 518  
 
 519  3
                 LOG.info("Primary MongoDB Connection closed: {}({}). "
 520  
                         + "Will try to reconnect.", getConnectionType(),
 521  
                         connection);
 522  
 
 523  
                 // Need to use the reconnect logic to find the new primary.
 524  3
                 final ConnectionInfo<K> newConn = reconnectMain();
 525  3
                 if (newConn != null) {
 526  1
                     removeCachedConnection(server, connection);
 527  1
                     updateMain(newConn);
 528  
                 }
 529  
                 // Else could not find a primary. Likely in a bad state but let
 530  
                 // the connection stay for secondary queries if we have another
 531  
                 // connection.
 532  2
                 else if (myConnections.size() == 1) {
 533  
                     // Mark this a graceful shutdown.
 534  1
                     removeCachedConnection(server, connection);
 535  1
                     shutdown(false);
 536  
 
 537  1
                     myEventSupport.firePropertyChange(
 538  
                             Connection.OPEN_PROP_NAME, true, isOpen());
 539  
                 }
 540  3
             }
 541  
             // Just remove the connection (above).
 542  
             else {
 543  0
                 LOG.debug("MongoDB Connection closed: {}({}).",
 544  
                         getConnectionType(), connection);
 545  
             }
 546  
         }
 547  
         finally {
 548  
             // Make sure we always remove the closed connection.
 549  3
             removeCachedConnection(server, connection);
 550  3
             connection.raiseErrors(new ConnectionLostException(
 551  
                     "Connection closed."));
 552  3
         }
 553  3
     }
 554  
 
 555  
     /**
 556  
      * Creates a connection back to the main server for this connection.
 557  
      * 
 558  
      * @return The information for the new connection.
 559  
      */
 560  
     protected abstract ConnectionInfo<K> reconnectMain();
 561  
 
 562  
     /**
 563  
      * Remove the connection from the cache.
 564  
      * 
 565  
      * @param key
 566  
      *            The key to remove the connection for.
 567  
      * @param connection
 568  
      *            The connection to remove (if known).
 569  
      */
 570  
     protected void removeCachedConnection(final Object key,
 571  
             final Connection connection) {
 572  7
         Connection conn = connection;
 573  7
         if (connection == null) {
 574  1
             conn = myConnections.remove(key);
 575  
         }
 576  6
         else if (!myConnections.remove(key, connection)) {
 577  
             // Different connection found.
 578  2
             conn = null;
 579  
         }
 580  
 
 581  7
         if (conn != null) {
 582  5
             conn.removePropertyChangeListener(myListener);
 583  5
             conn.shutdown(true);
 584  
         }
 585  7
     }
 586  
 
 587  
     /**
 588  
      * Tries to send the messages to the first server with either an open
 589  
      * connection or that we can open a connection to.
 590  
      * 
 591  
      * @param servers
 592  
      *            The servers the messages can be sent to.
 593  
      * @param message1
 594  
      *            The first message to send.
 595  
      * @param message2
 596  
      *            The second message to send. May be <code>null</code>.
 597  
      * @param reply
 598  
      *            The callback for the replies.
 599  
      * @return The true if the message was sent.
 600  
      */
 601  
     protected boolean trySend(final List<K> servers, final Message message1,
 602  
             final Message message2, final ReplyCallback reply) {
 603  14
         for (final K server : servers) {
 604  
 
 605  56
             Connection conn = myConnections.get(server);
 606  
 
 607  
             // See if we need to create a connection.
 608  56
             if (conn == null) {
 609  
                 // Create one.
 610  52
                 conn = connect(server);
 611  
             }
 612  4
             else if (!conn.isAvailable()) {
 613  
 
 614  1
                 removeCachedConnection(server, conn);
 615  
 
 616  1
                 final ReconnectStrategy strategy = myFactory
 617  
                         .getReconnectStrategy();
 618  1
                 conn = strategy.reconnect(conn);
 619  1
                 if (conn != null) {
 620  1
                     conn = cacheConnection(server, conn);
 621  
                 }
 622  
             }
 623  
 
 624  56
             if (conn != null) {
 625  13
                 doSend(conn, message1, message2, reply);
 626  13
                 return true;
 627  
             }
 628  43
         }
 629  
 
 630  1
         return false;
 631  
     }
 632  
 
 633  
     /**
 634  
      * Update the state with the new primary server.
 635  
      * 
 636  
      * @param newConn
 637  
      *            The new primary server.
 638  
      */
 639  
     protected void updateMain(final ConnectionInfo<K> newConn) {
 640  1
         myMainKey = newConn.getConnectionKey();
 641  
 
 642  
         // Add the connection to the cache. This also gets the listener
 643  
         // attached.
 644  1
         cacheConnection(newConn.getConnectionKey(), newConn.getConnection());
 645  1
     }
 646  
 
 647  
     /**
 648  
      * Finds the server for the connection.
 649  
      * 
 650  
      * @param connection
 651  
      *            The connection to remove.
 652  
      * @return The K for the connection.
 653  
      */
 654  
     private K findKeyForConnection(final Connection connection) {
 655  3
         for (final Map.Entry<K, Connection> entry : myConnections.entrySet()) {
 656  3
             if (entry.getValue() == connection) {
 657  3
                 return entry.getKey();
 658  
             }
 659  0
         }
 660  0
         return null;
 661  
     }
 662  
 
 663  
     /**
 664  
      * ClusterListener provides a listener for changes in the cluster.
 665  
      * 
 666  
      * @api.no This class is <b>NOT</b> part of the drivers API. This class may
 667  
      *         be mutated in incompatible ways between any two releases of the
 668  
      *         driver.
 669  
      * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
 670  
      */
 671  52
     protected final class ClusterAndConnectionListener implements
 672  
             PropertyChangeListener {
 673  
         @Override
 674  
         public void propertyChange(final PropertyChangeEvent event) {
 675  11
             final String propName = event.getPropertyName();
 676  11
             if (Cluster.SERVER_PROP.equals(propName)
 677  
                     && (event.getNewValue() == null)) {
 678  
                 // A K has been removed. Close the connection.
 679  1
                 removeCachedConnection(event.getOldValue(), null);
 680  
             }
 681  10
             else if (Connection.OPEN_PROP_NAME.equals(event.getPropertyName())
 682  
                     && Boolean.FALSE.equals(event.getNewValue())) {
 683  3
                 handleConnectionClosed((Connection) event.getSource());
 684  
             }
 685  11
         }
 686  
 
 687  
     }
 688  
 }