1 /*
2 * #%L
3 * BootstrapConnectionFactory.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20 package com.allanbank.mongodb.client.connection.bootstrap;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.util.List;
25 import java.util.concurrent.ExecutionException;
26 import java.util.logging.Level;
27
28 import com.allanbank.mongodb.MongoClientConfiguration;
29 import com.allanbank.mongodb.bson.Document;
30 import com.allanbank.mongodb.bson.Element;
31 import com.allanbank.mongodb.bson.element.StringElement;
32 import com.allanbank.mongodb.client.ClusterStats;
33 import com.allanbank.mongodb.client.ClusterType;
34 import com.allanbank.mongodb.client.callback.FutureReplyCallback;
35 import com.allanbank.mongodb.client.connection.Connection;
36 import com.allanbank.mongodb.client.connection.ConnectionFactory;
37 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
38 import com.allanbank.mongodb.client.connection.auth.AuthenticationConnectionFactory;
39 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
40 import com.allanbank.mongodb.client.connection.rs.ReplicaSetConnectionFactory;
41 import com.allanbank.mongodb.client.connection.sharded.ShardedConnectionFactory;
42 import com.allanbank.mongodb.client.connection.socket.SocketConnectionFactory;
43 import com.allanbank.mongodb.client.message.IsMaster;
44 import com.allanbank.mongodb.client.message.Reply;
45 import com.allanbank.mongodb.client.state.Cluster;
46 import com.allanbank.mongodb.error.CannotConnectException;
47 import com.allanbank.mongodb.util.IOUtils;
48 import com.allanbank.mongodb.util.log.Log;
49 import com.allanbank.mongodb.util.log.LogFactory;
50
51 /**
52 * Provides the ability to bootstrap into the appropriate
53 * {@link ConnectionFactory} based on the configuration of the server(s)
54 * connected to.
55 *
56 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
57 * mutated in incompatible ways between any two releases of the driver.
58 * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
59 */
60 public class BootstrapConnectionFactory implements ConnectionFactory {
61
62 /** The logger for the {@link BootstrapConnectionFactory}. */
63 protected static final Log LOG = LogFactory
64 .getLog(BootstrapConnectionFactory.class);
65
66 /** The configuration for the connections to be created. */
67 private final MongoClientConfiguration myConfig;
68
69 /** The delegate connection factory post */
70 private ConnectionFactory myDelegate = null;
71
72 /**
73 * Creates a {@link BootstrapConnectionFactory}
74 *
75 * @param config
76 * The configuration to use in discovering the server
77 * configuration.
78 */
79 public BootstrapConnectionFactory(final MongoClientConfiguration config) {
80 myConfig = config;
81 }
82
83 /**
84 * {@inheritDoc}
85 * <p>
86 * Overridden to close the delegate {@link ConnectionFactory}.
87 * </p>
88 */
89 @Override
90 public void close() {
91 IOUtils.close(myDelegate);
92 }
93
94 /**
95 * {@inheritDoc}
96 * <p>
97 * Delegates the connection to the setup delegate.
98 * </p>
99 */
100 @Override
101 public Connection connect() throws IOException {
102 return getDelegate().connect();
103 }
104
105 /**
106 * {@inheritDoc}
107 * <p>
108 * Overridden to return the cluster stats of the proxied
109 * {@link ConnectionFactory}.
110 * </p>
111 */
112 @Override
113 public ClusterStats getClusterStats() {
114 return getDelegate().getClusterStats();
115 }
116
117 /**
118 * {@inheritDoc}
119 * <p>
120 * Overridden to return the cluster type of the delegate
121 * {@link ConnectionFactory}.
122 * </p>
123 */
124 @Override
125 public ClusterType getClusterType() {
126 return getDelegate().getClusterType();
127 }
128
129 /**
130 * {@inheritDoc}
131 * <p>
132 * Overridden to return the delegates strategy.
133 * </p>
134 */
135 @Override
136 public ReconnectStrategy getReconnectStrategy() {
137 return getDelegate().getReconnectStrategy();
138 }
139
140 /**
141 * Re-bootstraps the environment. Normally this method is only called once
142 * during the constructor of the factory to initialize the delegate but
143 * users can reset the delegate by manually invoking this method.
144 * <p>
145 * A bootstrap will issue one commands to the first working MongoDB process.
146 * The reply to the {@link IsMaster} command is used to detect connecting to
147 * a mongos <tt>process</tt> and by extension a Sharded configuration.
148 * </p>
149 * <p>
150 * If not using a Sharded configuration then the server status is checked
151 * for a <tt>repl</tt> element. If present a Replication Set configuration
152 * is assumed.
153 * </p>
154 * <p>
155 * If neither a Sharded or Replication Set is being used then a plain socket
156 * connection factory is used.
157 * </p>
158 */
159 protected void bootstrap() {
160 final SocketConnectionFactory socketFactory = new SocketConnectionFactory(
161 myConfig);
162 ProxiedConnectionFactory factory = socketFactory;
163
164 // Authentication has to be right on top of the physical
165 // connection.
166 if (myConfig.isAuthenticating()) {
167 factory = new AuthenticationConnectionFactory(factory, myConfig);
168 }
169
170 try {
171 // Use the socket factories cluster.
172 final Cluster cluster = socketFactory.getCluster();
173 for (final InetSocketAddress addr : myConfig.getServerAddresses()) {
174 Connection conn = null;
175 final FutureReplyCallback future = new FutureReplyCallback();
176 try {
177 conn = factory.connect(cluster.add(addr), myConfig);
178
179 conn.send(new IsMaster(), future);
180 final Reply reply = future.get();
181
182 // Close the connection now that we have the reply.
183 IOUtils.close(conn);
184
185 final List<Document> results = reply.getResults();
186 if (!results.isEmpty()) {
187 final Document doc = results.get(0);
188
189 if (isMongos(doc)) {
190 LOG.debug("Sharded bootstrap to {}.", addr);
191 cluster.clear(); // not needed.
192 myDelegate = bootstrapSharded(factory);
193 }
194 else if (isReplicationSet(doc)) {
195 LOG.debug("Replica-set bootstrap to {}.", addr);
196 cluster.clear(); // not needed.
197 myDelegate = bootstrapReplicaSet(factory);
198 }
199 else {
200 LOG.debug("Simple MongoDB bootstrap to {}.", addr);
201 myDelegate = factory;
202 }
203 factory = null; // Don't close.
204 return;
205 }
206 }
207 catch (final IOException ioe) {
208 LOG.warn(ioe, "I/O error during bootstrap to {}.", addr);
209 }
210 catch (final InterruptedException e) {
211 LOG.warn(e, "Interrupted during bootstrap to {}.", addr);
212 }
213 catch (final ExecutionException e) {
214 LOG.warn(e, "Error during bootstrap to {}.", addr);
215 }
216 finally {
217 IOUtils.close(conn, Level.WARNING,
218 "I/O error shutting down bootstrap connection to "
219 + addr + ".");
220 }
221 }
222 }
223 finally {
224 IOUtils.close(factory);
225 }
226 }
227
228 /**
229 * Initializes the factory for connecting to the replica set.
230 *
231 * @param factory
232 * The factory for connecting to the servers directly.
233 * @return The connection factory for connecting to the replica set.
234 */
235 protected ConnectionFactory bootstrapReplicaSet(
236 final ProxiedConnectionFactory factory) {
237 return new ReplicaSetConnectionFactory(factory, getConfig());
238 }
239
240 /**
241 * Initializes the factory for connecting to the sharded cluster.
242 *
243 * @param factory
244 * The factory for connecting to the servers directly.
245 * @return The connection factory for connecting to the sharded cluster.
246 */
247 protected ConnectionFactory bootstrapSharded(
248 final ProxiedConnectionFactory factory) {
249 return new ShardedConnectionFactory(factory, getConfig());
250 }
251
252 /**
253 * The configuration for the client.
254 *
255 * @return The configuration for the client.
256 */
257 protected MongoClientConfiguration getConfig() {
258 return myConfig;
259 }
260
261 /**
262 * Returns the underlying delegate factory.
263 *
264 * @return The underlying delegate factory.
265 */
266 protected ConnectionFactory getDelegate() {
267 if (myDelegate == null) {
268 return createDelegate();
269 }
270 return myDelegate;
271 }
272
273 /**
274 * Sets the underlying delegate factory.
275 *
276 * @param delegate
277 * The underlying delegate factory.
278 */
279 protected void setDelegate(final ConnectionFactory delegate) {
280 myDelegate = delegate;
281 }
282
283 /**
284 * Creates the delegate connection factory.
285 *
286 * @return The delegate connection factory.
287 */
288 private synchronized ConnectionFactory createDelegate() {
289 if (myDelegate == null) {
290 bootstrap();
291 if (myDelegate == null) {
292 LOG.warn("Could not bootstrap a connection to the MongoDB servers.");
293 throw new CannotConnectException(
294 "Could not bootstrap a connection to the MongoDB servers.");
295 }
296 }
297 return myDelegate;
298 }
299
300 /**
301 * Returns true if the document contains a "process" element that is a
302 * string and contains the value "mongos".
303 *
304 * @param doc
305 * The document to validate.
306 * @return True if the document contains a "process" element that is a
307 * string and contains the value "mongos".
308 */
309 private boolean isMongos(final Document doc) {
310
311 final Element processName = doc.get("msg");
312 if (processName instanceof StringElement) {
313 return "isdbgrid".equals(((StringElement) processName).getValue());
314 }
315
316 return false;
317 }
318
319 /**
320 * Returns true if the document contains a "repl" element that is a
321 * sub-document.
322 *
323 * @param doc
324 * The document to validate.
325 * @return True if the document contains a "repl" element that is a
326 * sub-document.
327 */
328 private boolean isReplicationSet(final Document doc) {
329 return (doc.get("setName") instanceof StringElement);
330 }
331 }