Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
ShardedConnectionFactory |
|
| 2.0526315789473686;2.053 | ||||
ShardedConnectionFactory$BootstrapState |
|
| 2.0526315789473686;2.053 |
1 | /* | |
2 | * #%L | |
3 | * ShardedConnectionFactory.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | package com.allanbank.mongodb.client.connection.sharded; | |
21 | ||
22 | import java.io.IOException; | |
23 | import java.net.InetSocketAddress; | |
24 | import java.util.List; | |
25 | import java.util.concurrent.ExecutionException; | |
26 | import java.util.logging.Level; | |
27 | ||
28 | import com.allanbank.mongodb.MongoClientConfiguration; | |
29 | import com.allanbank.mongodb.MongoDbException; | |
30 | import com.allanbank.mongodb.ReadPreference; | |
31 | import com.allanbank.mongodb.bson.Document; | |
32 | import com.allanbank.mongodb.bson.Element; | |
33 | import com.allanbank.mongodb.bson.element.StringElement; | |
34 | import com.allanbank.mongodb.builder.Find; | |
35 | import com.allanbank.mongodb.client.ClusterStats; | |
36 | import com.allanbank.mongodb.client.ClusterType; | |
37 | import com.allanbank.mongodb.client.Message; | |
38 | import com.allanbank.mongodb.client.callback.FutureReplyCallback; | |
39 | import com.allanbank.mongodb.client.connection.Connection; | |
40 | import com.allanbank.mongodb.client.connection.ConnectionFactory; | |
41 | import com.allanbank.mongodb.client.connection.ReconnectStrategy; | |
42 | import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory; | |
43 | import com.allanbank.mongodb.client.message.GetMore; | |
44 | import com.allanbank.mongodb.client.message.Query; | |
45 | import com.allanbank.mongodb.client.message.Reply; | |
46 | import com.allanbank.mongodb.client.state.Cluster; | |
47 | import com.allanbank.mongodb.client.state.ClusterPinger; | |
48 | import com.allanbank.mongodb.client.state.LatencyServerSelector; | |
49 | import com.allanbank.mongodb.client.state.Server; | |
50 | import com.allanbank.mongodb.client.state.ServerSelector; | |
51 | import com.allanbank.mongodb.util.IOUtils; | |
52 | import com.allanbank.mongodb.util.log.Log; | |
53 | import com.allanbank.mongodb.util.log.LogFactory; | |
54 | ||
55 | /** | |
56 | * Provides the ability to create connections to a shard configuration via | |
57 | * mongos servers. | |
58 | * | |
59 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may be | |
60 | * mutated in incompatible ways between any two releases of the driver. | |
61 | * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved | |
62 | */ | |
63 | public class ShardedConnectionFactory implements ConnectionFactory { | |
64 | ||
65 | /** The logger for the {@link ShardedConnectionFactory}. */ | |
66 | 1 | protected static final Log LOG = LogFactory |
67 | .getLog(ShardedConnectionFactory.class); | |
68 | ||
69 | /** The state of the cluster. */ | |
70 | protected final Cluster myCluster; | |
71 | ||
72 | /** The MongoDB client configuration. */ | |
73 | protected final MongoClientConfiguration myConfig; | |
74 | ||
75 | /** The factory to create proxied connections. */ | |
76 | protected final ProxiedConnectionFactory myConnectionFactory; | |
77 | ||
78 | /** Pings the servers in the cluster collecting latency and tags. */ | |
79 | protected final ClusterPinger myPinger; | |
80 | ||
81 | /** The selector for the mongos instance to use. */ | |
82 | protected final ServerSelector mySelector; | |
83 | ||
84 | /** | |
85 | * Creates a new {@link ShardedConnectionFactory}. | |
86 | * | |
87 | * @param factory | |
88 | * The factory to create proxied connections. | |
89 | * @param config | |
90 | * The initial configuration. | |
91 | */ | |
92 | public ShardedConnectionFactory(final ProxiedConnectionFactory factory, | |
93 | 16 | final MongoClientConfiguration config) { |
94 | 16 | myConnectionFactory = factory; |
95 | 16 | myConfig = config; |
96 | 16 | myCluster = createCluster(config); |
97 | 16 | mySelector = createSelector(); |
98 | 16 | myPinger = createClusterPinger(factory, config); |
99 | ||
100 | // Add all of the servers to the cluster. | |
101 | 16 | for (final InetSocketAddress address : config.getServerAddresses()) { |
102 | 16 | myCluster.add(address); |
103 | 16 | } |
104 | ||
105 | 16 | bootstrap(); |
106 | 16 | } |
107 | ||
108 | /** | |
109 | * Finds the mongos servers. | |
110 | */ | |
111 | public void bootstrap() { | |
112 | 16 | final BootstrapState state = createBootstrapState(); |
113 | 16 | if (!state.done()) { |
114 | 14 | for (final Server addr : myCluster.getServers()) { |
115 | 13 | Connection conn = null; |
116 | try { | |
117 | // Send the request... | |
118 | 13 | conn = myConnectionFactory.connect(addr, myConfig); |
119 | ||
120 | 12 | update(state, conn); |
121 | ||
122 | 8 | if (state.done()) { |
123 | break; | |
124 | } | |
125 | } | |
126 | 1 | catch (final IOException ioe) { |
127 | 1 | LOG.warn(ioe, "I/O error during sharded bootstrap to {}.", |
128 | addr); | |
129 | } | |
130 | 2 | catch (final MongoDbException me) { |
131 | 2 | LOG.warn(me, |
132 | "MongoDB error during sharded bootstrap to {}.", | |
133 | addr); | |
134 | } | |
135 | 1 | catch (final InterruptedException e) { |
136 | 1 | LOG.warn(e, "Interrupted during sharded bootstrap to {}.", |
137 | addr); | |
138 | } | |
139 | 1 | catch (final ExecutionException e) { |
140 | 1 | LOG.warn(e, "Error during sharded bootstrap to {}.", addr); |
141 | } | |
142 | finally { | |
143 | 10 | IOUtils.close(conn, Level.WARNING, |
144 | "I/O error shutting down sharded bootstrap connection to " | |
145 | + addr + "."); | |
146 | 8 | } |
147 | 8 | } |
148 | } | |
149 | ||
150 | // Last thing is to start the ping of servers. This will get the tags | |
151 | // and latencies updated. | |
152 | 16 | myPinger.initialSweep(myCluster); |
153 | 16 | myPinger.start(); |
154 | 16 | } |
155 | ||
156 | /** | |
157 | * {@inheritDoc} | |
158 | * <p> | |
159 | * Overridden to close the cluster state and the | |
160 | * {@link ProxiedConnectionFactory}. | |
161 | * </p> | |
162 | */ | |
163 | @Override | |
164 | public void close() { | |
165 | 17 | IOUtils.close(myPinger); |
166 | 17 | IOUtils.close(myConnectionFactory); |
167 | 17 | } |
168 | ||
169 | /** | |
170 | * Creates a new connection to the shared mongos servers. | |
171 | * | |
172 | * @see ConnectionFactory#connect() | |
173 | */ | |
174 | @Override | |
175 | public Connection connect() throws IOException { | |
176 | 5 | IOException lastError = null; |
177 | 5 | for (final Server server : mySelector.pickServers()) { |
178 | try { | |
179 | 3 | final Connection primaryConn = myConnectionFactory.connect( |
180 | server, myConfig); | |
181 | ||
182 | 2 | return wrap(primaryConn, server); |
183 | } | |
184 | 1 | catch (final IOException e) { |
185 | 1 | lastError = e; |
186 | } | |
187 | 1 | } |
188 | ||
189 | 3 | if (lastError != null) { |
190 | 1 | throw lastError; |
191 | } | |
192 | ||
193 | 2 | throw new IOException( |
194 | "Could not determine a shard server to connect to."); | |
195 | } | |
196 | ||
197 | /** | |
198 | * {@inheritDoc} | |
199 | * <p> | |
200 | * Overridden to return the {@link Cluster}. | |
201 | * </p> | |
202 | */ | |
203 | @Override | |
204 | public ClusterStats getClusterStats() { | |
205 | 0 | return myCluster; |
206 | } | |
207 | ||
208 | /** | |
209 | * {@inheritDoc} | |
210 | * <p> | |
211 | * Overridden to return {@link ClusterType#SHARDED} cluster type. | |
212 | * </p> | |
213 | */ | |
214 | @Override | |
215 | public ClusterType getClusterType() { | |
216 | 2 | return ClusterType.SHARDED; |
217 | } | |
218 | ||
219 | /** | |
220 | * {@inheritDoc} | |
221 | * <p> | |
222 | * Overridden to return the delegates strategy but replace his state and | |
223 | * selector with our own. | |
224 | * </p> | |
225 | */ | |
226 | @Override | |
227 | public ReconnectStrategy getReconnectStrategy() { | |
228 | 2 | final ReconnectStrategy delegates = myConnectionFactory |
229 | .getReconnectStrategy(); | |
230 | ||
231 | 2 | delegates.setState(myCluster); |
232 | 2 | delegates.setSelector(mySelector); |
233 | 2 | delegates.setConnectionFactory(myConnectionFactory); |
234 | ||
235 | 2 | return delegates; |
236 | } | |
237 | ||
238 | /** | |
239 | * Creates a new {@link BootstrapState}. | |
240 | * | |
241 | * @return The {@link BootstrapState} to track state of loading the cluster | |
242 | * information. | |
243 | */ | |
244 | protected BootstrapState createBootstrapState() { | |
245 | 16 | return new BootstrapState(!myConfig.isAutoDiscoverServers()); |
246 | } | |
247 | ||
248 | /** | |
249 | * Creates a {@link Cluster} object to track the state of the servers across | |
250 | * the cluster. | |
251 | * | |
252 | * @param config | |
253 | * The configuration for the cluster. | |
254 | * @return The {@link Cluster} to track the servers across the cluster. | |
255 | */ | |
256 | protected Cluster createCluster(final MongoClientConfiguration config) { | |
257 | 16 | return new Cluster(config, ClusterType.SHARDED); |
258 | } | |
259 | ||
260 | /** | |
261 | * Creates a {@link ClusterPinger} object to periodically update the status | |
262 | * of the servers. | |
263 | * | |
264 | * @param factory | |
265 | * The factory for creating the connections to the servers. | |
266 | * @param config | |
267 | * The configuration for the client. | |
268 | * | |
269 | * @return The {@link ClusterPinger} object to periodically update the | |
270 | * status of the servers. | |
271 | */ | |
272 | protected ClusterPinger createClusterPinger( | |
273 | final ProxiedConnectionFactory factory, | |
274 | final MongoClientConfiguration config) { | |
275 | 16 | return new ClusterPinger(myCluster, factory, config); |
276 | } | |
277 | ||
278 | /** | |
279 | * Creates a {@link ServerSelector} object to select the (presumed) optimal | |
280 | * server to handle a request. | |
281 | * <p> | |
282 | * For a sharded cluster this defaults to the {@link LatencyServerSelector}. | |
283 | * </p> | |
284 | * | |
285 | * @return The {@link ServerSelector} object to select the (presumed) | |
286 | * optimal server to handle a request. | |
287 | */ | |
288 | protected ServerSelector createSelector() { | |
289 | 16 | return new LatencyServerSelector(myCluster, true); |
290 | } | |
291 | ||
292 | /** | |
293 | * Performs a find on the <tt>config</tt> database's <tt>mongos</tt> | |
294 | * collection to return the id for all of the mongos servers in the cluster. | |
295 | * <p> | |
296 | * A single mongos entry looks like: <blockquote> | |
297 | * | |
298 | * <pre> | |
299 | * <code> | |
300 | * { | |
301 | * "_id" : "mongos.example.com:27017", | |
302 | * "ping" : ISODate("2011-12-05T23:54:03.122Z"), | |
303 | * "up" : 330 | |
304 | * } | |
305 | * </code> | |
306 | * </pre> | |
307 | * | |
308 | * </blockquote> | |
309 | * | |
310 | * @param conn | |
311 | * The connection to request from. | |
312 | * @return True if the configuration servers have been determined. | |
313 | * @throws ExecutionException | |
314 | * On a failure to recover the response from the server. | |
315 | * @throws InterruptedException | |
316 | * On a failure to receive a response from the server. | |
317 | */ | |
318 | protected boolean findMongosServers(final Connection conn) | |
319 | throws InterruptedException, ExecutionException { | |
320 | 12 | boolean found = false; |
321 | ||
322 | // Create a query to pull all of the mongos servers out of the | |
323 | // config database. | |
324 | 12 | Message message = new Query("config", "mongos", Find.ALL, |
325 | /* fields= */null, /* batchSize= */0, | |
326 | /* limit= */0, /* numberToSkip= */0, /* tailable= */false, | |
327 | ReadPreference.PRIMARY, /* noCursorTimeout= */false, | |
328 | /* awaitData= */false, /* exhaust= */false, /* partial= */ | |
329 | false); | |
330 | ||
331 | 20 | while (message != null) { |
332 | // Send the request... | |
333 | 12 | final FutureReplyCallback future = new FutureReplyCallback(); |
334 | 12 | conn.send(message, future); |
335 | ||
336 | // Don's send it again. | |
337 | 10 | message = null; |
338 | ||
339 | // Receive the response. | |
340 | 10 | final Reply reply = future.get(); |
341 | ||
342 | // Validate and pull out the response information. | |
343 | 8 | final List<Document> docs = reply.getResults(); |
344 | 8 | for (final Document doc : docs) { |
345 | 11 | final Element idElem = doc.get("_id"); |
346 | 11 | if (idElem instanceof StringElement) { |
347 | 9 | final StringElement id = (StringElement) idElem; |
348 | ||
349 | 9 | myCluster.add(id.getValue()); |
350 | 9 | found = true; |
351 | ||
352 | 9 | LOG.debug("Adding shard mongos: {}", id.getValue()); |
353 | } | |
354 | 11 | } |
355 | ||
356 | // Cursor? | |
357 | 8 | if (reply.getCursorId() != 0) { |
358 | // Send a GetMore. | |
359 | 0 | message = new GetMore("config", "mongos", reply.getCursorId(), |
360 | 0, ReadPreference.PRIMARY); | |
361 | } | |
362 | 8 | } |
363 | ||
364 | 8 | return found; |
365 | } | |
366 | ||
367 | /** | |
368 | * Returns the clusterState value. | |
369 | * | |
370 | * @return The clusterState value. | |
371 | */ | |
372 | protected Cluster getCluster() { | |
373 | 3 | return myCluster; |
374 | } | |
375 | ||
376 | /** | |
377 | * Queries for the addresses for the {@code mongos} servers via the | |
378 | * {@link #findMongosServers(Connection)} method. | |
379 | * | |
380 | * @param state | |
381 | * The state of the bootstrap to be updated. | |
382 | * @param conn | |
383 | * The connection to use to locate the {@code mongos} servers | |
384 | * @throws InterruptedException | |
385 | * On a failure to wait for the reply to the query due to the | |
386 | * thread being interrupted. | |
387 | * @throws ExecutionException | |
388 | * On a failure to execute the query. | |
389 | */ | |
390 | protected void update(final BootstrapState state, final Connection conn) | |
391 | throws InterruptedException, ExecutionException { | |
392 | 12 | if (state.isMongosFound() || findMongosServers(conn)) { |
393 | 5 | state.setMongosFound(true); |
394 | } | |
395 | 8 | } |
396 | ||
397 | /** | |
398 | * Wraps the connection in a shard-aware connection. | |
399 | * | |
400 | * @param primaryConn | |
401 | * The primary shard connection. | |
402 | * @param server | |
403 | * The server the connection is connected to. | |
404 | * @return The wrapped connection. | |
405 | */ | |
406 | protected Connection wrap(final Connection primaryConn, final Server server) { | |
407 | 2 | return new ShardedConnection(primaryConn, server, myCluster, |
408 | mySelector, myConnectionFactory, myConfig); | |
409 | } | |
410 | ||
411 | /** | |
412 | * BootstrapState provides the ability to track the state of the bootstrap | |
413 | * for the sharded cluster. | |
414 | * | |
415 | * @copyright 2013-2014, Allanbank Consulting, Inc., All Rights Reserved | |
416 | */ | |
417 | protected static class BootstrapState { | |
418 | /** Tracks if the {@code mongos} servers have been located. */ | |
419 | private boolean myMongosFound; | |
420 | ||
421 | /** | |
422 | * Creates a new BootstrapState. | |
423 | * | |
424 | * @param mongosFound | |
425 | * Initials if we should look for the {@code mongos} servers. | |
426 | */ | |
427 | 16 | protected BootstrapState(final boolean mongosFound) { |
428 | 16 | myMongosFound = mongosFound; |
429 | 16 | } |
430 | ||
431 | /** | |
432 | * Indicates when the bootstrap is complete. | |
433 | * <p> | |
434 | * This method returns true if auto discovery is turned off or (if on) | |
435 | * when all of the {@code mongos} servers have been located. | |
436 | * | |
437 | * @return True once the boot strap is complete. | |
438 | */ | |
439 | public boolean done() { | |
440 | 24 | return myMongosFound; |
441 | } | |
442 | ||
443 | /** | |
444 | * Returns true if the {@code mongos} servers have been found, false | |
445 | * otherwise. | |
446 | * | |
447 | * @return True if the {@code mongos} servers have been found, false | |
448 | * otherwise. | |
449 | */ | |
450 | public boolean isMongosFound() { | |
451 | 12 | return myMongosFound; |
452 | } | |
453 | ||
454 | /** | |
455 | * Sets if the the {@code mongos} servers have been found. | |
456 | * | |
457 | * @param mongosFound | |
458 | * If true, the {@code mongos} servers have been found, false | |
459 | * otherwise. | |
460 | */ | |
461 | public void setMongosFound(final boolean mongosFound) { | |
462 | 5 | myMongosFound = mongosFound; |
463 | 5 | } |
464 | } | |
465 | } |