1 /*
2 * #%L
3 * AbstractProxyMultipleConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20
21 package com.allanbank.mongodb.client.connection.proxy;
22
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.beans.PropertyChangeSupport;
26 import java.io.IOException;
27 import java.util.Arrays;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentMap;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicReference;
37
38 import com.allanbank.mongodb.Callback;
39 import com.allanbank.mongodb.MongoClientConfiguration;
40 import com.allanbank.mongodb.MongoDbException;
41 import com.allanbank.mongodb.ReadPreference;
42 import com.allanbank.mongodb.client.Message;
43 import com.allanbank.mongodb.client.callback.ReplyCallback;
44 import com.allanbank.mongodb.client.connection.Connection;
45 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
46 import com.allanbank.mongodb.client.state.Cluster;
47 import com.allanbank.mongodb.error.ConnectionLostException;
48 import com.allanbank.mongodb.util.log.Log;
49 import com.allanbank.mongodb.util.log.LogFactory;
50
51 /**
52 * AbstractProxyMultipleConnection provides the core functionality for a
53 * connection that multiplexes requests across multiple connections.
54 *
55 * @param <K>
56 * The key used to track the various connections.
57 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
58 * mutated in incompatible ways between any two releases of the driver.
59 * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
60 */
61 public abstract class AbstractProxyMultipleConnection<K> implements Connection {
62
63 /** The logger for the {@link AbstractProxyMultipleConnection}. */
64 private static final Log LOG = LogFactory
65 .getLog(AbstractProxyMultipleConnection.class);
66
67 /** The state of the cluster for finding secondary connections. */
68 protected final Cluster myCluster;
69
70 /** The MongoDB client configuration. */
71 protected final MongoClientConfiguration myConfig;
72
73 /** Support for emitting property change events. */
74 protected final PropertyChangeSupport myEventSupport;
75
76 /** The connection factory for opening secondary connections. */
77 protected final ProxiedConnectionFactory myFactory;
78
79 /** The most recently used connection. */
80 protected final AtomicReference<Connection> myLastUsedConnection;
81
82 /** The listener for changes in the cluster and connections. */
83 protected final PropertyChangeListener myListener;
84
85 /** The primary instance this connection is connected to. */
86 protected volatile K myMainKey;
87
88 /** Set to false when the connection is closed. */
89 protected final AtomicBoolean myOpen;
90
91 /** Set to true when the connection should be gracefully closed. */
92 protected final AtomicBoolean myShutdown;
93
94 /** The servers this connection is connected to. */
95 /* package */final ConcurrentMap<K, Connection> myConnections;
96
97 /**
98 * Creates a new {@link AbstractProxyMultipleConnection}.
99 *
100 * @param proxiedConnection
101 * The connection being proxied.
102 * @param server
103 * The primary server this connection is connected to.
104 * @param cluster
105 * The state of the cluster for finding secondary connections.
106 * @param factory
107 * The connection factory for opening secondary connections.
108 * @param config
109 * The MongoDB client configuration.
110 */
111 public AbstractProxyMultipleConnection(final Connection proxiedConnection,
112 final K server, final Cluster cluster,
113 final ProxiedConnectionFactory factory,
114 final MongoClientConfiguration config) {
115 myMainKey = server;
116 myCluster = cluster;
117 myFactory = factory;
118 myConfig = config;
119
120 myOpen = new AtomicBoolean(true);
121 myShutdown = new AtomicBoolean(false);
122 myEventSupport = new PropertyChangeSupport(this);
123 myConnections = new ConcurrentHashMap<K, Connection>();
124 myLastUsedConnection = new AtomicReference<Connection>(
125 proxiedConnection);
126
127 myListener = new ClusterAndConnectionListener();
128 myCluster.addListener(myListener);
129
130 if (proxiedConnection != null) {
131 cacheConnection(server, proxiedConnection);
132 }
133 }
134
135 /**
136 * {@inheritDoc}
137 * <p>
138 * Overridden to add this listener to this connection's event source.
139 * </p>
140 */
141 @Override
142 public void addPropertyChangeListener(final PropertyChangeListener listener) {
143 myEventSupport.addPropertyChangeListener(listener);
144 }
145
146 /**
147 * Closes the underlying connection.
148 *
149 * @see Connection#close()
150 */
151 @Override
152 public void close() {
153
154 myOpen.set(false);
155 myCluster.removeListener(myListener);
156
157 for (final Connection conn : myConnections.values()) {
158 try {
159 conn.removePropertyChangeListener(myListener);
160 conn.close();
161 }
162 catch (final IOException ioe) {
163 LOG.warn(ioe, "Could not close the connection: {}", conn);
164 }
165 }
166 }
167
168 /**
169 * {@inheritDoc}
170 * <p>
171 * Forwards the call to the proxied {@link Connection}.
172 * </p>
173 *
174 * @see java.io.Flushable#flush()
175 */
176 @Override
177 public void flush() throws IOException {
178 for (final Connection conn : myConnections.values()) {
179 try {
180 conn.flush();
181 }
182 catch (final IOException ioe) {
183 LOG.warn(ioe, "Could not flush the connection: {}", conn);
184 }
185 }
186 }
187
188 /**
189 * {@inheritDoc}
190 * <p>
191 * Overridden to return the pending count for the last connection used to
192 * send a message.
193 * </p>
194 */
195 @Override
196 public int getPendingCount() {
197 return myLastUsedConnection.get().getPendingCount();
198 }
199
200 /**
201 * {@inheritDoc}
202 * <p>
203 * True if the connection is open and not shutting down.
204 * </p>
205 */
206 @Override
207 public boolean isAvailable() {
208 return isOpen() && !isShuttingDown();
209 }
210
211 /**
212 * {@inheritDoc}
213 * <p>
214 * Overridden to return if the last used connection is idle.
215 * </p>
216 */
217 @Override
218 public boolean isIdle() {
219 return myLastUsedConnection.get().isIdle();
220 }
221
222 /**
223 * {@inheritDoc}
224 * <p>
225 * Overridden to return if this connection has any open connections.
226 * </p>
227 */
228 @Override
229 public boolean isOpen() {
230 return myOpen.get();
231 }
232
233 /**
234 * {@inheritDoc}
235 * <p>
236 * Overridden to return if the last used connection is shutting down.
237 * </p>
238 */
239 @Override
240 public boolean isShuttingDown() {
241 return myShutdown.get();
242 }
243
244 /**
245 * {@inheritDoc}
246 * <p>
247 * Overridden to raise the errors with all of the underlying connections.
248 * </p>
249 */
250 @Override
251 public void raiseErrors(final MongoDbException exception) {
252 for (final Connection conn : myConnections.values()) {
253 conn.raiseErrors(exception);
254 }
255 }
256
257 /**
258 * {@inheritDoc}
259 * <p>
260 * Overridden to remove the listener from this connection.
261 * </p>
262 */
263 @Override
264 public void removePropertyChangeListener(
265 final PropertyChangeListener listener) {
266 myEventSupport.removePropertyChangeListener(listener);
267 }
268
269 /**
270 * {@inheritDoc}
271 * <p>
272 * Locates all of the potential servers that can receive all of the
273 * messages. Tries to then send the messages to a server with a connection
274 * already open or failing that tries to open a connection to open of the
275 * servers.
276 * </p>
277 */
278 @Override
279 public void send(final Message message1, final Message message2,
280 final ReplyCallback replyCallback) throws MongoDbException {
281
282 if (!isAvailable()) {
283 throw new ConnectionLostException("Connection shutting down.");
284 }
285
286 final List<K> servers = findPotentialKeys(message1, message2);
287 if (!trySend(servers, message1, message2, replyCallback)) {
288 throw new MongoDbException(
289 "Could not send the messages to any of the potential servers.");
290 }
291 }
292
293 /**
294 * {@inheritDoc}
295 * <p>
296 * Locates all of the potential servers that can receive all of the
297 * messages. Tries to then send the messages to a server with a connection
298 * already open or failing that tries to open a connection to open of the
299 * servers.
300 * </p>
301 */
302 @Override
303 public void send(final Message message, final ReplyCallback replyCallback)
304 throws MongoDbException {
305 send(message, null, replyCallback);
306 }
307
308 /**
309 * {@inheritDoc}
310 * <p>
311 * Overridden to shutdown all of the underlying connections.
312 * </p>
313 */
314 @Override
315 public void shutdown(final boolean force) {
316 myShutdown.set(true);
317 for (final Connection conn : myConnections.values()) {
318 conn.shutdown(force);
319 }
320 }
321
322 /**
323 * {@inheritDoc}
324 * <p>
325 * Overridden to return the socket information.
326 * </p>
327 */
328 @Override
329 public String toString() {
330 return getConnectionType() + "(" + myLastUsedConnection.get() + ")";
331 }
332
333 /**
334 * {@inheritDoc}
335 * <p>
336 * Overridden to wait for all of the underlying connections to close.
337 * </p>
338 */
339 @Override
340 public void waitForClosed(final int timeout, final TimeUnit timeoutUnits) {
341 final long millis = timeoutUnits.toMillis(timeout);
342 long now = System.currentTimeMillis();
343 final long deadline = now + millis;
344
345 for (final Connection conn : myConnections.values()) {
346 if (now < deadline) {
347 conn.waitForClosed((int) (deadline - now),
348 TimeUnit.MILLISECONDS);
349 now = System.currentTimeMillis();
350 }
351 }
352 }
353
354 /**
355 * Caches the connection to the server if there is not already a connection
356 * in the cache. If there is a connection already in the cache then the one
357 * provided is closed and the cached connection it returned.
358 *
359 * @param server
360 * The server connected to.
361 * @param conn
362 * The connection to cache, if possible.
363 * @return The connection in the cache.
364 */
365 protected Connection cacheConnection(final K server, final Connection conn) {
366 final Connection existing = myConnections.putIfAbsent(server, conn);
367 if (existing != null) {
368 conn.shutdown(true);
369 return existing;
370 }
371
372 // Listener to the connection for it to close.
373 conn.addPropertyChangeListener(myListener);
374
375 return conn;
376 }
377
378 /**
379 * Attempts to create a connection to the server, catching any exceptions
380 * thrown. If a connection is created it should be
381 * {@link #cacheConnection(Object, Connection) cached}.
382 *
383 * @param server
384 * The server to connect to.
385 * @return The connection to the server.
386 */
387 protected abstract Connection connect(final K server);
388
389 /**
390 * Returns the cached connection for the specified key. This method may
391 * return {@code null}.
392 *
393 * @param server
394 * The server connected to.
395 * @return The connection in the cache.
396 */
397 protected Connection connection(final K server) {
398 return myConnections.get(server);
399 }
400
401 /**
402 * Creates a exception for a reconnect failure.
403 *
404 * @param message1
405 * The first message to send.
406 * @param message2
407 * The second message to send.
408 * @return The exception.
409 */
410 protected MongoDbException createReconnectFailure(final Message message1,
411 final Message message2) {
412 final StringBuilder builder = new StringBuilder(
413 "Could not find any servers for the following set of read preferences: ");
414 final Set<ReadPreference> seen = new HashSet<ReadPreference>();
415 for (final Message message : Arrays.asList(message1, message2)) {
416 if (message != null) {
417 final ReadPreference prefs = message.getReadPreference();
418 if (seen.add(prefs)) {
419 if (seen.size() > 1) {
420 builder.append(", ");
421 }
422 builder.append(prefs);
423 }
424 }
425 }
426 builder.append('.');
427
428 return new MongoDbException(builder.toString());
429 }
430
431 /**
432 * Sends the message on the connection.
433 *
434 * @param conn
435 * The connection to send on.
436 * @param message1
437 * The first message to send.
438 * @param message2
439 * The second message to send, may be <code>null</code>.
440 * @param reply
441 * The reply {@link Callback}.
442 */
443 protected void doSend(final Connection conn, final Message message1,
444 final Message message2, final ReplyCallback reply) {
445
446 // Use the connection for metrics etc.
447 myLastUsedConnection.lazySet(conn);
448
449 if (message2 == null) {
450 conn.send(message1, reply);
451 }
452 else {
453 conn.send(message1, message2, reply);
454 }
455 }
456
457 /**
458 * Locates the set of servers that can be used to send the specified
459 * messages. This method will attempt to connect to the primary server if
460 * there is not a current connection to the primary.
461 *
462 * @param message1
463 * The first message to send.
464 * @param message2
465 * The second message to send. May be <code>null</code>.
466 * @return The servers that can be used.
467 * @throws MongoDbException
468 * On a failure to locate a server that all messages can be sent
469 * to.
470 */
471 protected abstract List<K> findPotentialKeys(final Message message1,
472 final Message message2) throws MongoDbException;
473
474 /**
475 * Returns the type of connection (for logs, etc.).
476 *
477 * @return The connection type.
478 */
479 protected abstract String getConnectionType();
480
481 /**
482 * Tries to reconnect previously open {@link Connection}s. If a connection
483 * was being closed then cleans up the remaining state.
484 *
485 * @param connection
486 * The connection that was closed.
487 */
488 protected synchronized void handleConnectionClosed(
489 final Connection connection) {
490
491 if (!myOpen.get()) {
492 return;
493 }
494
495 final K server = findKeyForConnection(connection);
496
497 try {
498 // If this is the last connection then go ahead and close this
499 // replica set connection so the number of active connections can
500 // shrink. Only close this connection on a graceful primary
501 // shutdown to pick up when a primary change happens.
502 final K primary = myMainKey;
503 if ((myConnections.size() == 1)
504 && (!server.equals(primary) || connection.isShuttingDown())) {
505
506 // Mark this a graceful shutdown.
507 removeCachedConnection(server, connection);
508 shutdown(true);
509
510 myEventSupport.firePropertyChange(Connection.OPEN_PROP_NAME,
511 true, isOpen());
512 }
513 // If the connection that closed was the primary then we need to
514 // reconnect.
515 else if (server.equals(primary) && isOpen()) {
516 // Not sure who is primary any more.
517 myMainKey = null;
518
519 LOG.info("Primary MongoDB Connection closed: {}({}). "
520 + "Will try to reconnect.", getConnectionType(),
521 connection);
522
523 // Need to use the reconnect logic to find the new primary.
524 final ConnectionInfo<K> newConn = reconnectMain();
525 if (newConn != null) {
526 removeCachedConnection(server, connection);
527 updateMain(newConn);
528 }
529 // Else could not find a primary. Likely in a bad state but let
530 // the connection stay for secondary queries if we have another
531 // connection.
532 else if (myConnections.size() == 1) {
533 // Mark this a graceful shutdown.
534 removeCachedConnection(server, connection);
535 shutdown(false);
536
537 myEventSupport.firePropertyChange(
538 Connection.OPEN_PROP_NAME, true, isOpen());
539 }
540 }
541 // Just remove the connection (above).
542 else {
543 LOG.debug("MongoDB Connection closed: {}({}).",
544 getConnectionType(), connection);
545 }
546 }
547 finally {
548 // Make sure we always remove the closed connection.
549 removeCachedConnection(server, connection);
550 connection.raiseErrors(new ConnectionLostException(
551 "Connection closed."));
552 }
553 }
554
555 /**
556 * Creates a connection back to the main server for this connection.
557 *
558 * @return The information for the new connection.
559 */
560 protected abstract ConnectionInfo<K> reconnectMain();
561
562 /**
563 * Remove the connection from the cache.
564 *
565 * @param key
566 * The key to remove the connection for.
567 * @param connection
568 * The connection to remove (if known).
569 */
570 protected void removeCachedConnection(final Object key,
571 final Connection connection) {
572 Connection conn = connection;
573 if (connection == null) {
574 conn = myConnections.remove(key);
575 }
576 else if (!myConnections.remove(key, connection)) {
577 // Different connection found.
578 conn = null;
579 }
580
581 if (conn != null) {
582 conn.removePropertyChangeListener(myListener);
583 conn.shutdown(true);
584 }
585 }
586
587 /**
588 * Tries to send the messages to the first server with either an open
589 * connection or that we can open a connection to.
590 *
591 * @param servers
592 * The servers the messages can be sent to.
593 * @param message1
594 * The first message to send.
595 * @param message2
596 * The second message to send. May be <code>null</code>.
597 * @param reply
598 * The callback for the replies.
599 * @return The true if the message was sent.
600 */
601 protected boolean trySend(final List<K> servers, final Message message1,
602 final Message message2, final ReplyCallback reply) {
603 for (final K server : servers) {
604
605 Connection conn = myConnections.get(server);
606
607 // See if we need to create a connection.
608 if (conn == null) {
609 // Create one.
610 conn = connect(server);
611 }
612 else if (!conn.isAvailable()) {
613
614 removeCachedConnection(server, conn);
615
616 final ReconnectStrategy strategy = myFactory
617 .getReconnectStrategy();
618 conn = strategy.reconnect(conn);
619 if (conn != null) {
620 conn = cacheConnection(server, conn);
621 }
622 }
623
624 if (conn != null) {
625 doSend(conn, message1, message2, reply);
626 return true;
627 }
628 }
629
630 return false;
631 }
632
633 /**
634 * Update the state with the new primary server.
635 *
636 * @param newConn
637 * The new primary server.
638 */
639 protected void updateMain(final ConnectionInfo<K> newConn) {
640 myMainKey = newConn.getConnectionKey();
641
642 // Add the connection to the cache. This also gets the listener
643 // attached.
644 cacheConnection(newConn.getConnectionKey(), newConn.getConnection());
645 }
646
647 /**
648 * Finds the server for the connection.
649 *
650 * @param connection
651 * The connection to remove.
652 * @return The K for the connection.
653 */
654 private K findKeyForConnection(final Connection connection) {
655 for (final Map.Entry<K, Connection> entry : myConnections.entrySet()) {
656 if (entry.getValue() == connection) {
657 return entry.getKey();
658 }
659 }
660 return null;
661 }
662
663 /**
664 * ClusterListener provides a listener for changes in the cluster.
665 *
666 * @api.no This class is <b>NOT</b> part of the drivers API. This class may
667 * be mutated in incompatible ways between any two releases of the
668 * driver.
669 * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
670 */
671 protected final class ClusterAndConnectionListener implements
672 PropertyChangeListener {
673 @Override
674 public void propertyChange(final PropertyChangeEvent event) {
675 final String propName = event.getPropertyName();
676 if (Cluster.SERVER_PROP.equals(propName)
677 && (event.getNewValue() == null)) {
678 // A K has been removed. Close the connection.
679 removeCachedConnection(event.getOldValue(), null);
680 }
681 else if (Connection.OPEN_PROP_NAME.equals(event.getPropertyName())
682 && Boolean.FALSE.equals(event.getNewValue())) {
683 handleConnectionClosed((Connection) event.getSource());
684 }
685 }
686
687 }
688 }