View Javadoc
1   /*
2    * #%L
3    * AbstractSocketConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.client.connection.socket;
22  
23  import java.beans.PropertyChangeListener;
24  import java.beans.PropertyChangeSupport;
25  import java.io.BufferedOutputStream;
26  import java.io.EOFException;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.io.InterruptedIOException;
30  import java.io.StreamCorruptedException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketException;
34  import java.net.SocketTimeoutException;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicInteger;
39  
40  import javax.net.SocketFactory;
41  
42  import com.allanbank.mongodb.MongoClientConfiguration;
43  import com.allanbank.mongodb.MongoDbException;
44  import com.allanbank.mongodb.Version;
45  import com.allanbank.mongodb.bson.io.BsonInputStream;
46  import com.allanbank.mongodb.bson.io.RandomAccessOutputStream;
47  import com.allanbank.mongodb.bson.io.StringDecoderCache;
48  import com.allanbank.mongodb.bson.io.StringEncoderCache;
49  import com.allanbank.mongodb.client.Message;
50  import com.allanbank.mongodb.client.Operation;
51  import com.allanbank.mongodb.client.VersionRange;
52  import com.allanbank.mongodb.client.callback.NoOpCallback;
53  import com.allanbank.mongodb.client.callback.Receiver;
54  import com.allanbank.mongodb.client.callback.ReplyCallback;
55  import com.allanbank.mongodb.client.callback.ReplyHandler;
56  import com.allanbank.mongodb.client.connection.Connection;
57  import com.allanbank.mongodb.client.connection.SocketConnectionListener;
58  import com.allanbank.mongodb.client.message.Delete;
59  import com.allanbank.mongodb.client.message.GetMore;
60  import com.allanbank.mongodb.client.message.Header;
61  import com.allanbank.mongodb.client.message.Insert;
62  import com.allanbank.mongodb.client.message.IsMaster;
63  import com.allanbank.mongodb.client.message.KillCursors;
64  import com.allanbank.mongodb.client.message.PendingMessage;
65  import com.allanbank.mongodb.client.message.PendingMessageQueue;
66  import com.allanbank.mongodb.client.message.Query;
67  import com.allanbank.mongodb.client.message.Reply;
68  import com.allanbank.mongodb.client.message.Update;
69  import com.allanbank.mongodb.client.state.Server;
70  import com.allanbank.mongodb.error.ConnectionLostException;
71  import com.allanbank.mongodb.error.DocumentToLargeException;
72  import com.allanbank.mongodb.error.ServerVersionException;
73  import com.allanbank.mongodb.util.IOUtils;
74  import com.allanbank.mongodb.util.log.Log;
75  import com.allanbank.mongodb.util.log.LogFactory;
76  
77  /**
78   * AbstractSocketConnection provides the basic functionality for a socket
79   * connection that passes messages between the sender and receiver.
80   * 
81   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
82   *         mutated in incompatible ways between any two releases of the driver.
83   * @copyright 2013-2014, Allanbank Consulting, Inc., All Rights Reserved
84   */
85  public abstract class AbstractSocketConnection implements Connection, Receiver {
86  
87      /** The length of the message header in bytes. */
88      public static final int HEADER_LENGTH = 16;
89  
90      /** The writer for BSON documents. Shares this objects {@link #myInput}. */
91      protected final BsonInputStream myBsonIn;
92  
93      /** The connections configuration. */
94      protected final MongoClientConfiguration myConfig;
95  
96      /** The cache for the encoding of strings. */
97      protected final StringEncoderCache myEncoderCache;
98  
99      /** Support for emitting property change events. */
100     protected final PropertyChangeSupport myEventSupport;
101 
102     /** The executor for the responses. */
103     protected final Executor myExecutor;
104 
105     /** The buffered input stream. */
106     protected final InputStream myInput;
107 
108     /** The logger for the connection. */
109     protected final Log myLog;
110 
111     /** Holds if the connection is open. */
112     protected final AtomicBoolean myOpen;
113 
114     /** The buffered output stream. */
115     protected final BufferedOutputStream myOutput;
116 
117     /** The queue of messages sent but waiting for a reply. */
118     protected final PendingMessageQueue myPendingQueue;
119 
120     /** The open socket. */
121     protected final Server myServer;
122 
123     /** Set to true when the connection should be gracefully closed. */
124     protected final AtomicBoolean myShutdown;
125 
126     /** The open socket. */
127     protected final Socket mySocket;
128 
129     /** Tracks the number of sequential read timeouts. */
130     private int myIdleTicks = 0;
131 
132     /** The {@link PendingMessage} used for the local cached copy. */
133     private final PendingMessage myPendingMessage = new PendingMessage();
134 
135     /** Set to true when the sender discovers they are the receive thread. */
136     private final AtomicInteger myReaderNeedsToFlush = new AtomicInteger(0);
137 
138     /**
139      * Creates a new AbstractSocketConnection.
140      * 
141      * @param server
142      *            The MongoDB server to connect to.
143      * @param config
144      *            The configuration for the Connection to the MongoDB server.
145      * @param encoderCache
146      *            Cache used for encoding strings.
147      * @param decoderCache
148      *            Cache used for decoding strings.
149      * @throws SocketException
150      *             On a failure connecting to the MongoDB server.
151      * @throws IOException
152      *             On a failure to read or write data to the MongoDB server.
153      */
154     public AbstractSocketConnection(final Server server,
155             final MongoClientConfiguration config,
156             final StringEncoderCache encoderCache,
157             final StringDecoderCache decoderCache) throws SocketException,
158             IOException {
159         super();
160 
161         myServer = server;
162         myConfig = config;
163         myEncoderCache = encoderCache;
164 
165         myLog = LogFactory.getLog(getClass());
166 
167         myExecutor = config.getExecutor();
168         myEventSupport = new PropertyChangeSupport(this);
169         myOpen = new AtomicBoolean(false);
170         myShutdown = new AtomicBoolean(false);
171 
172         mySocket = openSocket(server, config);
173         updateSocketWithOptions(config);
174 
175         myOpen.set(true);
176 
177         myInput = mySocket.getInputStream();
178         myBsonIn = new BsonInputStream(myInput, decoderCache);
179 
180         // Careful with the size of the buffer here. Seems Java likes to call
181         // madvise(..., MADV_DONTNEED) for buffers over a certain size.
182         // Net effect is that the performance of the system goes down the
183         // drain. Some numbers using the
184         // UnixDomainSocketAccepatanceTest.testMultiFetchiterator
185         // 1M ==> More than a minute...
186         // 512K ==> 24 seconds
187         // 256K ==> 16.9 sec.
188         // 128K ==> 17 sec.
189         // 64K ==> 17 sec.
190         // 32K ==> 16.5 sec.
191         // Based on those numbers we set the buffer to 32K as larger does not
192         // improve performance.
193         myOutput = new BufferedOutputStream(mySocket.getOutputStream(),
194                 32 * 1024);
195 
196         myPendingQueue = new PendingMessageQueue(
197                 config.getMaxPendingOperationsPerConnection(),
198                 config.getLockType());
199     }
200 
201     /**
202      * {@inheritDoc}
203      * <p>
204      * Overridden to add the listener to this connection.
205      * </p>
206      */
207     @Override
208     public void addPropertyChangeListener(final PropertyChangeListener listener) {
209         myEventSupport.addPropertyChangeListener(listener);
210     }
211 
212     /**
213      * {@inheritDoc}
214      */
215     @Override
216     public void flush() throws IOException {
217         myReaderNeedsToFlush.set(0);
218         myOutput.flush();
219     }
220 
221     /**
222      * {@inheritDoc}
223      */
224     @Override
225     public int getPendingCount() {
226         return myPendingQueue.size();
227     }
228 
229     /**
230      * {@inheritDoc}
231      * <p>
232      * Overridden to returns the server's name.
233      * </p>
234      */
235     @Override
236     public String getServerName() {
237         return myServer.getCanonicalName();
238     }
239 
240     /**
241      * {@inheritDoc}
242      * <p>
243      * True if the connection is open and not shutting down.
244      * </p>
245      */
246     @Override
247     public boolean isAvailable() {
248         return isOpen() && !isShuttingDown();
249     }
250 
251     /**
252      * {@inheritDoc}
253      * <p>
254      * True if the send and pending queues are empty.
255      * </p>
256      */
257     @Override
258     public boolean isIdle() {
259         return myPendingQueue.isEmpty();
260     }
261 
262     /**
263      * {@inheritDoc}
264      * <p>
265      * True if the connection has not been closed.
266      * </p>
267      */
268     @Override
269     public boolean isOpen() {
270         return myOpen.get();
271     }
272 
273     /**
274      * {@inheritDoc}
275      */
276     @Override
277     public boolean isShuttingDown() {
278         return myShutdown.get();
279     }
280 
281     /**
282      * {@inheritDoc}
283      * <p>
284      * Notifies the appropriate messages of the error.
285      * </p>
286      */
287     @Override
288     public void raiseErrors(final MongoDbException exception) {
289         final PendingMessage message = new PendingMessage();
290 
291         while (myPendingQueue.poll(message)) {
292             raiseError(exception, message.getReplyCallback());
293         }
294     }
295 
296     /**
297      * {@inheritDoc}
298      * <p>
299      * Overridden to remove the listener from this connection.
300      * </p>
301      */
302     @Override
303     public void removePropertyChangeListener(
304             final PropertyChangeListener listener) {
305         myEventSupport.removePropertyChangeListener(listener);
306     }
307 
308     /**
309      * {@inheritDoc}
310      * <p>
311      * Overridden to mark the socket as shutting down and tickles the sender to
312      * make sure that happens as soon as possible.
313      * </p>
314      */
315     @Override
316     public void shutdown(final boolean force) {
317         // Mark
318         myShutdown.set(true);
319 
320         if (force) {
321             IOUtils.close(this);
322         }
323         else {
324             if (isOpen()) {
325                 // Force a message with a callback to wake the receiver up.
326                 send(new IsMaster(), new NoOpCallback());
327             }
328         }
329     }
330 
331     /**
332      * Starts the connection.
333      */
334     public abstract void start();
335 
336     /**
337      * Stops the socket connection by calling {@link #shutdown(boolean)
338      * shutdown(false)}.
339      */
340     public void stop() {
341         shutdown(false);
342     }
343 
344     /**
345      * {@inheritDoc}
346      * <p>
347      * Overridden to return the socket information.
348      * </p>
349      */
350     @Override
351     public String toString() {
352         return "MongoDB(" + mySocket.getLocalPort() + "-->"
353                 + mySocket.getRemoteSocketAddress() + ")";
354     }
355 
356     /**
357      * {@inheritDoc}
358      * <p>
359      * If there is a pending flush then flushes.
360      * </p>
361      * <p>
362      * If there is any available data then does a single receive.
363      * </p>
364      */
365     @Override
366     public void tryReceive() {
367         try {
368             doReceiverFlush();
369 
370             if ((myBsonIn.available() > 0) || (myInput.available() > 0)) {
371                 doReceiveOne();
372             }
373         }
374         catch (final IOException error) {
375             myLog.info(
376                     "Received an error when checking for pending messages: {}.",
377                     error.getMessage());
378         }
379     }
380 
381     /**
382      * {@inheritDoc}
383      * <p>
384      * Waits for the connections pending queues to empty.
385      * </p>
386      */
387     @Override
388     public void waitForClosed(final int timeout, final TimeUnit timeoutUnits) {
389         long now = System.currentTimeMillis();
390         final long deadline = now + timeoutUnits.toMillis(timeout);
391 
392         while (isOpen() && (now < deadline)) {
393             try {
394                 // A slow spin loop.
395                 TimeUnit.MILLISECONDS.sleep(10);
396             }
397             catch (final InterruptedException e) {
398                 // Ignore.
399                 e.hashCode();
400             }
401             now = System.currentTimeMillis();
402         }
403     }
404 
405     /**
406      * Receives a single message from the connection.
407      * 
408      * @return The {@link Message} received.
409      * @throws MongoDbException
410      *             On an error receiving the message.
411      */
412     protected Message doReceive() throws MongoDbException {
413         try {
414             int length;
415             try {
416                 length = readIntSuppressTimeoutOnNonFirstByte();
417             }
418             catch (final SocketTimeoutException ok) {
419                 // This is OK. We check if we are still running and come right
420                 // back.
421                 return null;
422             }
423 
424             myBsonIn.prefetch(length - 4);
425 
426             final int requestId = myBsonIn.readInt();
427             final int responseId = myBsonIn.readInt();
428             final int opCode = myBsonIn.readInt();
429 
430             final Operation op = Operation.fromCode(opCode);
431             if (op == null) {
432                 // Huh? Dazed and confused
433                 throw new MongoDbException("Unexpected operation read '"
434                         + opCode + "'.");
435             }
436 
437             final Header header = new Header(length, requestId, responseId, op);
438             Message message = null;
439             switch (op) {
440             case REPLY:
441                 message = new Reply(header, myBsonIn);
442                 break;
443             case QUERY:
444                 message = new Query(header, myBsonIn);
445                 break;
446             case UPDATE:
447                 message = new Update(myBsonIn);
448                 break;
449             case INSERT:
450                 message = new Insert(header, myBsonIn);
451                 break;
452             case GET_MORE:
453                 message = new GetMore(myBsonIn);
454                 break;
455             case DELETE:
456                 message = new Delete(myBsonIn);
457                 break;
458             case KILL_CURSORS:
459                 message = new KillCursors(myBsonIn);
460                 break;
461             }
462 
463             myServer.incrementRepliesReceived();
464 
465             return message;
466         }
467 
468         catch (final IOException ioe) {
469             final MongoDbException error = new ConnectionLostException(ioe);
470 
471             shutdown(error, (ioe instanceof InterruptedIOException));
472 
473             throw error;
474         }
475     }
476 
477     /**
478      * Receives and process a single message.
479      */
480     protected void doReceiveOne() {
481 
482         doReceiverFlush();
483 
484         final Message received = doReceive();
485         if (received instanceof Reply) {
486             myIdleTicks = 0;
487             final Reply reply = (Reply) received;
488             final int replyId = reply.getResponseToId();
489             boolean took = false;
490 
491             // Keep polling the pending queue until we get to
492             // message based on a matching replyId.
493             try {
494                 took = myPendingQueue.poll(myPendingMessage);
495                 while (took && (myPendingMessage.getMessageId() != replyId)) {
496 
497                     final MongoDbException noReply = new MongoDbException(
498                             "No reply received.");
499 
500                     // Note that this message will not get a reply.
501                     raiseError(noReply, myPendingMessage.getReplyCallback());
502 
503                     // Keep looking.
504                     took = myPendingQueue.poll(myPendingMessage);
505                 }
506 
507                 if (took) {
508                     // Must be the pending message's reply.
509                     reply(reply, myPendingMessage);
510                 }
511                 else {
512                     myLog.warn("Could not find the callback for reply '{}'.",
513                             +replyId);
514                 }
515             }
516             finally {
517                 myPendingMessage.clear();
518             }
519         }
520         else if (received != null) {
521             myLog.warn("Received a non-Reply message: {}.", received);
522             shutdown(new ConnectionLostException(new StreamCorruptedException(
523                     "Received a non-Reply message: " + received)), false);
524         }
525         else {
526             myIdleTicks += 1;
527 
528             if (myConfig.getMaxIdleTickCount() <= myIdleTicks) {
529                 // Shutdown the connection., nicely.
530                 shutdown(false);
531             }
532         }
533     }
534 
535     /**
536      * Sends a single message to the connection.
537      * 
538      * @param messageId
539      *            The id to use for the message.
540      * @param message
541      *            The message to send.
542      * @throws IOException
543      *             On a failure sending the message.
544      */
545     protected void doSend(final int messageId,
546             final RandomAccessOutputStream message) throws IOException {
547         message.writeTo(myOutput);
548         message.reset();
549 
550         myServer.incrementMessagesSent();
551     }
552 
553     /**
554      * Should be called when the send of a message happens on the receive
555      * thread. The sender should not flush the {@link #myOutput}. Instead the
556      * receive thread will {@link #flush()} once it has consumed all of the
557      * pending messages to be received.
558      */
559     protected void markReaderNeedsToFlush() {
560         myReaderNeedsToFlush.incrementAndGet();
561     }
562 
563     /**
564      * Updates to raise an error on the callback, if any.
565      * 
566      * @param exception
567      *            The thrown exception.
568      * @param replyCallback
569      *            The callback for the reply to the message.
570      */
571     protected void raiseError(final Throwable exception,
572             final ReplyCallback replyCallback) {
573         ReplyHandler.raiseError(exception, replyCallback, myExecutor);
574     }
575 
576     /**
577      * Reads a little-endian 4 byte signed integer from the stream.
578      * 
579      * @return The integer value.
580      * @throws EOFException
581      *             On insufficient data for the integer.
582      * @throws IOException
583      *             On a failure reading the integer.
584      */
585     protected int readIntSuppressTimeoutOnNonFirstByte() throws EOFException,
586             IOException {
587         int read = 0;
588         int eofCheck = 0;
589         int result = 0;
590 
591         read = myBsonIn.read();
592         eofCheck |= read;
593         result += (read << 0);
594 
595         for (int i = Byte.SIZE; i < Integer.SIZE; i += Byte.SIZE) {
596             try {
597                 read = myBsonIn.read();
598             }
599             catch (final SocketTimeoutException ste) {
600                 // Bad - Only the first byte should timeout.
601                 throw new IOException(ste);
602             }
603             eofCheck |= read;
604             result += (read << i);
605         }
606 
607         if (eofCheck < 0) {
608             throw new EOFException("Remote connection closed: "
609                     + mySocket.getRemoteSocketAddress());
610         }
611         return result;
612     }
613 
614     /**
615      * Updates to set the reply for the callback, if any.
616      * 
617      * @param reply
618      *            The reply.
619      * @param pendingMessage
620      *            The pending message.
621      */
622     protected void reply(final Reply reply, final PendingMessage pendingMessage) {
623 
624         final long latency = pendingMessage.latency();
625 
626         // Add the latency.
627         if (latency > 0) {
628             myServer.updateAverageLatency(latency);
629         }
630 
631         final ReplyCallback callback = pendingMessage.getReplyCallback();
632         ReplyHandler.reply(this, reply, callback, myExecutor);
633     }
634 
635     /**
636      * Sends a single message.
637      * 
638      * @param pendingMessage
639      *            The message to be sent.
640      * @param message
641      *            The message that has already been encoded/serialized. This may
642      *            be <code>null</code> in which case the message is streamed to
643      *            the socket.
644      * @throws InterruptedException
645      *             If the thread is interrupted waiting for a message to send.
646      * @throws IOException
647      *             On a failure sending the message.
648      */
649     protected final void send(final PendingMessage pendingMessage,
650             final RandomAccessOutputStream message)
651             throws InterruptedException, IOException {
652 
653         final int messageId = pendingMessage.getMessageId();
654 
655         // Mark the timestamp.
656         pendingMessage.timestampNow();
657 
658         // Make sure the message is on the queue before the
659         // message is sent to ensure the receive thread can
660         // assume an empty pending queue means that there is
661         // no message for the reply.
662         if ((pendingMessage.getReplyCallback() != null)
663                 && !myPendingQueue.offer(pendingMessage)) {
664             // Push what we have out before blocking.
665             flush();
666             myPendingQueue.put(pendingMessage);
667         }
668 
669         doSend(messageId, message);
670 
671         // If shutting down then flush after each message.
672         if (myShutdown.get()) {
673             flush();
674         }
675     }
676 
677     /**
678      * Shutsdown the connection on an error.
679      * 
680      * @param error
681      *            The error causing the shutdown.
682      * @param receiveError
683      *            If true then the socket experienced a receive error.
684      */
685     protected void shutdown(final MongoDbException error,
686             final boolean receiveError) {
687         if (receiveError) {
688             myServer.connectionTerminated();
689         }
690 
691         // Have to assume all of the requests have failed that are pending.
692         final PendingMessage message = new PendingMessage();
693         while (myPendingQueue.poll(message)) {
694             raiseError(error, message.getReplyCallback());
695         }
696 
697         closeQuietly();
698     }
699 
700     /**
701      * Updates the socket with the configuration's socket options.
702      * 
703      * @param config
704      *            The configuration to apply.
705      * @throws SocketException
706      *             On a failure setting the socket options.
707      */
708     protected void updateSocketWithOptions(final MongoClientConfiguration config)
709             throws SocketException {
710         mySocket.setKeepAlive(config.isUsingSoKeepalive());
711         mySocket.setSoTimeout(config.getReadTimeout());
712         try {
713             mySocket.setTcpNoDelay(true);
714         }
715         catch (final SocketException seIgnored) {
716             // The junixsocket implementation does not support TCP_NO_DELAY,
717             // which makes sense but it throws an exception instead of silently
718             // ignoring - ignore it here.
719             if (!"AFUNIXSocketException".equals(seIgnored.getClass()
720                     .getSimpleName())) {
721                 throw seIgnored;
722             }
723         }
724         mySocket.setPerformancePreferences(1, 5, 6);
725     }
726 
727     /**
728      * Ensures that the documents in the message do not exceed the maximum size
729      * allowed by MongoDB.
730      * 
731      * @param message1
732      *            The message to be sent to the server.
733      * @param message2
734      *            The second message to be sent to the server.
735      * @throws DocumentToLargeException
736      *             On a message being too large.
737      * @throws ServerVersionException
738      *             If one of the messages cannot be sent to the server version.
739      */
740     protected void validate(final Message message1, final Message message2)
741             throws DocumentToLargeException, ServerVersionException {
742 
743         final Version serverVersion = myServer.getVersion();
744         final int maxBsonSize = myServer.getMaxBsonObjectSize();
745 
746         message1.validateSize(maxBsonSize);
747         validateVersion(message1, serverVersion);
748 
749         if (message2 != null) {
750             message2.validateSize(maxBsonSize);
751             validateVersion(message1, serverVersion);
752         }
753     }
754 
755     /**
756      * Closes the connection to the server without allowing an exception to be
757      * thrown.
758      */
759     private void closeQuietly() {
760         try {
761             close();
762         }
763         catch (final IOException e) {
764             myLog.warn(e, "I/O exception trying to shutdown the connection.");
765         }
766     }
767 
768     /**
769      * Check if the handler for a message dropped data in the send buffer that
770      * it did not flush to avoid a deadlock with the server. If so then flush
771      * that message.
772      */
773     private void doReceiverFlush() {
774         try {
775             final int unflushedMessages = myReaderNeedsToFlush.get();
776             if ((unflushedMessages != 0)
777                     && (myPendingQueue.size() <= unflushedMessages)) {
778                 flush();
779             }
780         }
781         catch (final IOException ignored) {
782             myLog.warn("Error flushing data to the server: "
783                     + ignored.getMessage());
784         }
785     }
786 
787     /**
788      * Tries to open a connection to the server.
789      * 
790      * @param server
791      *            The server to open the connection to.
792      * @param config
793      *            The configuration for attempting to open the connection.
794      * @return The opened {@link Socket}.
795      * @throws IOException
796      *             On a failure opening a connection to the server.
797      */
798     private Socket openSocket(final Server server,
799             final MongoClientConfiguration config) throws IOException {
800         final SocketFactory factory = config.getSocketFactory();
801 
802         IOException last = null;
803         Socket socket = null;
804         for (final InetSocketAddress address : myServer.getAddresses()) {
805             try {
806 
807                 socket = factory.createSocket();
808                 socket.connect(address, config.getConnectTimeout());
809 
810                 // If the factory wants to know about the connection then let it
811                 // know first.
812                 if (factory instanceof SocketConnectionListener) {
813                     ((SocketConnectionListener) factory).connected(address,
814                             socket);
815                 }
816 
817                 // Let the server know the working connection.
818                 server.connectionOpened(address);
819 
820                 last = null;
821                 break;
822             }
823             catch (final IOException error) {
824                 last = error;
825                 try {
826                     if (socket != null) {
827                         socket.close();
828                     }
829                 }
830                 catch (final IOException ignore) {
831                     myLog.info(
832                             "Could not close the defunct socket connection: {}",
833                             socket);
834                 }
835             }
836 
837         }
838         if (last != null) {
839             server.connectFailed();
840             throw last;
841         }
842 
843         return socket;
844     }
845 
846     /**
847      * Validates that the server we are about to send the message to knows how
848      * to handle the message.
849      * 
850      * @param message
851      *            The message to be sent.
852      * @param serverVersion
853      *            The server version.
854      * @throws ServerVersionException
855      *             If the messages cannot be sent to the server version.
856      */
857     private void validateVersion(final Message message,
858             final Version serverVersion) throws ServerVersionException {
859         final VersionRange range = message.getRequiredVersionRange();
860         if ((range != null) && !range.contains(serverVersion)) {
861             throw new ServerVersionException(message.getOperationName(), range,
862                     serverVersion, message);
863         }
864     }
865 }